From b9d0800795a405e406e6f42a90d7d96f67d81375 Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Mon, 24 Jul 2023 17:29:59 -0700 Subject: [PATCH 1/9] KAFKA-15244: Remove PluginType.from(Class) Signed-off-by: Greg Harris --- .../kafka/connect/runtime/AbstractHerder.java | 41 +++++------- .../connect/runtime/isolation/PluginDesc.java | 4 +- .../runtime/isolation/PluginScanner.java | 25 ++++---- .../connect/runtime/isolation/PluginType.java | 16 ++--- .../runtime/isolation/ReflectionScanner.java | 31 ++++----- .../isolation/ServiceLoaderScanner.java | 18 +++--- .../connect/runtime/AbstractHerderTest.java | 5 +- .../isolation/DelegatingClassLoaderTest.java | 2 +- .../runtime/isolation/PluginDescTest.java | 63 +++++++++++++------ .../runtime/isolation/PluginUtilsTest.java | 18 +++--- .../ConnectorPluginsResourceTest.java | 55 +++++++--------- 11 files changed, 138 insertions(+), 140 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index b1a19fd46aecc..7f04914c8d12f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -33,7 +33,6 @@ import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.isolation.LoaderSwap; -import org.apache.kafka.connect.runtime.isolation.PluginType; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; @@ -842,34 +841,26 @@ public List connectorPluginConfig(String pluginName) { try (LoaderSwap loaderSwap = p.withClassLoader(pluginClass.getClassLoader())) { Object plugin = p.newPlugin(pluginName); - PluginType pluginType = PluginType.from(plugin.getClass()); // Contains definitions coming from Connect framework ConfigDef baseConfigDefs = null; // Contains definitions specifically declared on the plugin ConfigDef pluginConfigDefs; - switch (pluginType) { - case SINK: - baseConfigDefs = SinkConnectorConfig.configDef(); - pluginConfigDefs = ((SinkConnector) plugin).config(); - break; - case SOURCE: - baseConfigDefs = SourceConnectorConfig.configDef(); - pluginConfigDefs = ((SourceConnector) plugin).config(); - break; - case CONVERTER: - pluginConfigDefs = ((Converter) plugin).config(); - break; - case HEADER_CONVERTER: - pluginConfigDefs = ((HeaderConverter) plugin).config(); - break; - case TRANSFORMATION: - pluginConfigDefs = ((Transformation) plugin).config(); - break; - case PREDICATE: - pluginConfigDefs = ((Predicate) plugin).config(); - break; - default: - throw new BadRequestException("Invalid plugin type " + pluginType + ". Valid types are sink, source, converter, header_converter, transformation, predicate."); + if (plugin instanceof SinkConnector) { + baseConfigDefs = SinkConnectorConfig.configDef(); + pluginConfigDefs = ((SinkConnector) plugin).config(); + } else if (plugin instanceof SourceConnector) { + baseConfigDefs = SourceConnectorConfig.configDef(); + pluginConfigDefs = ((SourceConnector) plugin).config(); + } else if (plugin instanceof Converter) { + pluginConfigDefs = ((Converter) plugin).config(); + } else if (plugin instanceof HeaderConverter) { + pluginConfigDefs = ((HeaderConverter) plugin).config(); + } else if (plugin instanceof Transformation) { + pluginConfigDefs = ((Transformation) plugin).config(); + } else if (plugin instanceof Predicate) { + pluginConfigDefs = ((Predicate) plugin).config(); + } else { + throw new BadRequestException("Invalid plugin class " + pluginName + ". Valid types are sink, source, converter, header_converter, transformation, predicate."); } // Track config properties by name and, if the same property is defined in multiple places, diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java index e001f104ee372..141abda79e60f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java @@ -32,12 +32,12 @@ public class PluginDesc implements Comparable> { private final String location; private final ClassLoader loader; - public PluginDesc(Class klass, String version, ClassLoader loader) { + public PluginDesc(Class klass, String version, PluginType type, ClassLoader loader) { this.klass = klass; this.name = klass.getName(); this.version = version != null ? version : "null"; this.encodedVersion = new DefaultArtifactVersion(this.version); - this.type = PluginType.from(klass); + this.type = type; this.typeName = type.toString(); this.location = loader instanceof PluginClassLoader ? ((PluginClassLoader) loader).location() diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java index 474a92f5cf54b..be159546d2d55 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java @@ -120,32 +120,32 @@ private void loadJdbcDrivers(final ClassLoader loader) { } @SuppressWarnings({"rawtypes", "unchecked"}) - protected PluginDesc pluginDesc(Class plugin, String version, PluginSource source) { - return new PluginDesc(plugin, version, source.loader()); + protected PluginDesc pluginDesc(Class plugin, String version, PluginType type, PluginSource source) { + return new PluginDesc(plugin, version, type, source.loader()); } @SuppressWarnings("unchecked") - protected SortedSet> getServiceLoaderPluginDesc(Class klass, PluginSource source) { + protected SortedSet> getServiceLoaderPluginDesc(PluginType type, PluginSource source) { SortedSet> result = new TreeSet<>(); - ServiceLoader serviceLoader = ServiceLoader.load(klass, source.loader()); + ServiceLoader serviceLoader = ServiceLoader.load((Class) type.superClass(), source.loader()); Iterator iterator = serviceLoader.iterator(); - while (handleLinkageError(klass, source, iterator::hasNext)) { + while (handleLinkageError(type, source, iterator::hasNext)) { try (LoaderSwap loaderSwap = withClassLoader(source.loader())) { T pluginImpl; try { - pluginImpl = handleLinkageError(klass, source, iterator::next); + pluginImpl = handleLinkageError(type, source, iterator::next); } catch (ServiceConfigurationError t) { log.error("Failed to discover {} in {}{}", - klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); + type, source.location(), reflectiveErrorDescription(t.getCause()), t); continue; } Class pluginKlass = (Class) pluginImpl.getClass(); if (pluginKlass.getClassLoader() != source.loader()) { log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading", - pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), source.location()); + type, pluginKlass.getClassLoader(), source.location()); continue; } - result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), source)); + result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), type, source)); } } return result; @@ -154,14 +154,13 @@ protected SortedSet> getServiceLoaderPluginDesc(Class klass /** * Helper to evaluate a {@link ServiceLoader} operation while handling {@link LinkageError}s. * - * @param klass The plugin superclass which is being loaded + * @param type The plugin type which is being loaded * @param function A function on a {@link ServiceLoader}'s {@link Iterator} which may throw {@link LinkageError} * @return the return value of function * @throws Error errors thrown by the passed-in function - * @param Type being iterated over by the ServiceLoader * @param Return value of the passed-in function */ - private U handleLinkageError(Class klass, PluginSource source, Supplier function) { + private U handleLinkageError(PluginType type, PluginSource source, Supplier function) { // It's difficult to know for sure if the iterator was able to advance past the first broken // plugin class, or if it will continue to fail on that broken class for any subsequent calls // to Iterator::hasNext or Iterator::next @@ -182,7 +181,7 @@ private U handleLinkageError(Class klass, PluginSource source, Supplie || !Objects.equals(lastError.getClass(), t.getClass()) || !Objects.equals(lastError.getMessage(), t.getMessage())) { log.error("Failed to discover {} in {}{}", - klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); + type, source.location(), reflectiveErrorDescription(t.getCause()), t); } lastError = t; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java index 696e14ba8cded..0f26e84a0bb84 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java @@ -37,8 +37,7 @@ public enum PluginType { PREDICATE(Predicate.class), CONFIGPROVIDER(ConfigProvider.class), REST_EXTENSION(ConnectRestExtension.class), - CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY(ConnectorClientConfigOverridePolicy.class), - UNKNOWN(Object.class); + CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY(ConnectorClientConfigOverridePolicy.class); private final Class klass; @@ -46,19 +45,14 @@ public enum PluginType { this.klass = klass; } - public static PluginType from(Class klass) { - for (PluginType type : PluginType.values()) { - if (type.klass.isAssignableFrom(klass)) { - return type; - } - } - return UNKNOWN; - } - public String simpleName() { return klass.getSimpleName(); } + public Class superClass() { + return klass; + } + @Override public String toString() { return super.toString().toLowerCase(Locale.ROOT); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java index e38fefc78c92b..13cb082e26a62 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java @@ -84,39 +84,40 @@ protected PluginScanResult scanPlugins(PluginSource source) { Reflections reflections = new Reflections(builder); return new PluginScanResult( - getPluginDesc(reflections, SinkConnector.class, source), - getPluginDesc(reflections, SourceConnector.class, source), - getPluginDesc(reflections, Converter.class, source), - getPluginDesc(reflections, HeaderConverter.class, source), + getPluginDesc(reflections, PluginType.SINK, source), + getPluginDesc(reflections, PluginType.SOURCE, source), + getPluginDesc(reflections, PluginType.CONVERTER, source), + getPluginDesc(reflections, PluginType.HEADER_CONVERTER, source), getTransformationPluginDesc(source, reflections), getPredicatePluginDesc(source, reflections), - getServiceLoaderPluginDesc(ConfigProvider.class, source), - getServiceLoaderPluginDesc(ConnectRestExtension.class, source), - getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, source) + getServiceLoaderPluginDesc(PluginType.CONFIGPROVIDER, source), + getServiceLoaderPluginDesc(PluginType.REST_EXTENSION, source), + getServiceLoaderPluginDesc(PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY, source) ); } @SuppressWarnings({"unchecked"}) private SortedSet>> getPredicatePluginDesc(PluginSource source, Reflections reflections) { - return (SortedSet>>) (SortedSet) getPluginDesc(reflections, Predicate.class, source); + return (SortedSet>>) (SortedSet) getPluginDesc(reflections, PluginType.PREDICATE, source); } @SuppressWarnings({"unchecked"}) private SortedSet>> getTransformationPluginDesc(PluginSource source, Reflections reflections) { - return (SortedSet>>) (SortedSet) getPluginDesc(reflections, Transformation.class, source); + return (SortedSet>>) (SortedSet) getPluginDesc(reflections, PluginType.TRANSFORMATION, source); } + @SuppressWarnings({"unchecked"}) private SortedSet> getPluginDesc( Reflections reflections, - Class klass, + PluginType type, PluginSource source ) { Set> plugins; try { - plugins = reflections.getSubTypesOf(klass); + plugins = reflections.getSubTypesOf((Class) type.superClass()); } catch (ReflectionsException e) { log.debug("Reflections scanner could not find any {} in {} for URLs: {}", - klass, source.location(), source.urls(), e); + type, source.location(), source.urls(), e); return Collections.emptySortedSet(); } @@ -128,14 +129,14 @@ private SortedSet> getPluginDesc( } if (pluginKlass.getClassLoader() != source.loader()) { log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading", - pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), source.location()); + type, pluginKlass.getClassLoader(), source.location()); continue; } try (LoaderSwap loaderSwap = withClassLoader(source.loader())) { - result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), source)); + result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), type, source)); } catch (ReflectiveOperationException | LinkageError e) { log.error("Failed to discover {} in {}: Unable to instantiate {}{}", - klass.getSimpleName(), source.location(), pluginKlass.getSimpleName(), + type, source.location(), pluginKlass.getSimpleName(), reflectiveErrorDescription(e), e); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java index 727a737ff3f34..9f36a5ad8e067 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java @@ -56,25 +56,25 @@ public class ServiceLoaderScanner extends PluginScanner { @Override protected PluginScanResult scanPlugins(PluginSource source) { return new PluginScanResult( - getServiceLoaderPluginDesc(SinkConnector.class, source), - getServiceLoaderPluginDesc(SourceConnector.class, source), - getServiceLoaderPluginDesc(Converter.class, source), - getServiceLoaderPluginDesc(HeaderConverter.class, source), + getServiceLoaderPluginDesc(PluginType.SINK, source), + getServiceLoaderPluginDesc(PluginType.SOURCE, source), + getServiceLoaderPluginDesc(PluginType.CONVERTER, source), + getServiceLoaderPluginDesc(PluginType.HEADER_CONVERTER, source), getTransformationPluginDesc(source), getPredicatePluginDesc(source), - getServiceLoaderPluginDesc(ConfigProvider.class, source), - getServiceLoaderPluginDesc(ConnectRestExtension.class, source), - getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, source) + getServiceLoaderPluginDesc(PluginType.CONFIGPROVIDER, source), + getServiceLoaderPluginDesc(PluginType.REST_EXTENSION, source), + getServiceLoaderPluginDesc(PluginType.CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY, source) ); } @SuppressWarnings({"unchecked"}) private SortedSet>> getPredicatePluginDesc(PluginSource source) { - return (SortedSet>>) (SortedSet) getServiceLoaderPluginDesc(Predicate.class, source); + return (SortedSet>>) (SortedSet) getServiceLoaderPluginDesc(PluginType.PREDICATE, source); } @SuppressWarnings({"unchecked"}) private SortedSet>> getTransformationPluginDesc(PluginSource source) { - return (SortedSet>>) (SortedSet) getServiceLoaderPluginDesc(Transformation.class, source); + return (SortedSet>>) (SortedSet) getServiceLoaderPluginDesc(PluginType.TRANSFORMATION, source); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index f1993a7e7f157..68ee2ce15612b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.connect.runtime.distributed.SampleConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.runtime.isolation.LoaderSwap; import org.apache.kafka.connect.runtime.isolation.PluginDesc; +import org.apache.kafka.connect.runtime.isolation.PluginType; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; @@ -667,12 +668,12 @@ public void testConfigValidationPredicatesExtendResults() { @SuppressWarnings({"rawtypes", "unchecked"}) private PluginDesc> predicatePluginDesc() { - return new PluginDesc(SamplePredicate.class, "1.0", classLoader); + return new PluginDesc(SamplePredicate.class, "1.0", PluginType.PREDICATE, classLoader); } @SuppressWarnings({"rawtypes", "unchecked"}) private PluginDesc> transformationPluginDesc() { - return new PluginDesc(SampleTransformation.class, "1.0", classLoader); + return new PluginDesc(SampleTransformation.class, "1.0", PluginType.TRANSFORMATION, classLoader); } @Test diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java index 4f683e3b35553..dcbfd26b847cc 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java @@ -61,7 +61,7 @@ public void setUp() { SortedSet> sinkConnectors = new TreeSet<>(); // Lie to the DCL that this arbitrary class is a connector, since all real connector classes we have access to // are forced to be non-isolated by PluginUtils.shouldLoadInIsolation. - pluginDesc = new PluginDesc<>((Class) ARBITRARY_CLASS, null, pluginLoader); + pluginDesc = new PluginDesc<>((Class) ARBITRARY_CLASS, null, PluginType.SINK, pluginLoader); assertTrue(PluginUtils.shouldLoadInIsolation(pluginDesc.className())); sinkConnectors.add(pluginDesc); scanResult = new PluginScanResult( diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java index f18537d0c32d7..dff3505f5449e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.connect.runtime.isolation; -import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.storage.Converter; @@ -52,29 +51,32 @@ public void setUp() throws Exception { @SuppressWarnings("rawtypes") @Test public void testRegularPluginDesc() { - PluginDesc connectorDesc = new PluginDesc<>( - Connector.class, + PluginDesc connectorDesc = new PluginDesc<>( + SinkConnector.class, regularVersion, + PluginType.SINK, pluginLoader ); - assertPluginDesc(connectorDesc, Connector.class, regularVersion, pluginLoader.location()); + assertPluginDesc(connectorDesc, SinkConnector.class, regularVersion, PluginType.SINK, pluginLoader.location()); PluginDesc converterDesc = new PluginDesc<>( Converter.class, snapshotVersion, + PluginType.CONVERTER, pluginLoader ); - assertPluginDesc(converterDesc, Converter.class, snapshotVersion, pluginLoader.location()); + assertPluginDesc(converterDesc, Converter.class, snapshotVersion, PluginType.CONVERTER, pluginLoader.location()); PluginDesc transformDesc = new PluginDesc<>( Transformation.class, noVersion, + PluginType.TRANSFORMATION, pluginLoader ); - assertPluginDesc(transformDesc, Transformation.class, noVersion, pluginLoader.location()); + assertPluginDesc(transformDesc, Transformation.class, noVersion, PluginType.TRANSFORMATION, pluginLoader.location()); } @SuppressWarnings("rawtypes") @@ -84,26 +86,29 @@ public void testPluginDescWithSystemClassLoader() { PluginDesc connectorDesc = new PluginDesc<>( SinkConnector.class, regularVersion, + PluginType.SINK, systemLoader ); - assertPluginDesc(connectorDesc, SinkConnector.class, regularVersion, location); + assertPluginDesc(connectorDesc, SinkConnector.class, regularVersion, PluginType.SINK, location); PluginDesc converterDesc = new PluginDesc<>( Converter.class, snapshotVersion, + PluginType.CONVERTER, systemLoader ); - assertPluginDesc(converterDesc, Converter.class, snapshotVersion, location); + assertPluginDesc(converterDesc, Converter.class, snapshotVersion, PluginType.CONVERTER, location); PluginDesc transformDesc = new PluginDesc<>( Transformation.class, noVersion, + PluginType.TRANSFORMATION, systemLoader ); - assertPluginDesc(transformDesc, Transformation.class, noVersion, location); + assertPluginDesc(transformDesc, Transformation.class, noVersion, PluginType.TRANSFORMATION, location); } @Test @@ -112,6 +117,7 @@ public void testPluginDescWithNullVersion() { PluginDesc connectorDesc = new PluginDesc<>( SourceConnector.class, null, + PluginType.SOURCE, pluginLoader ); @@ -119,6 +125,7 @@ public void testPluginDescWithNullVersion() { connectorDesc, SourceConnector.class, nullVersion, + PluginType.SOURCE, pluginLoader.location() ); @@ -126,24 +133,27 @@ public void testPluginDescWithNullVersion() { PluginDesc converterDesc = new PluginDesc<>( Converter.class, null, + PluginType.CONVERTER, systemLoader ); - assertPluginDesc(converterDesc, Converter.class, nullVersion, location); + assertPluginDesc(converterDesc, Converter.class, nullVersion, PluginType.CONVERTER, location); } @SuppressWarnings("rawtypes") @Test public void testPluginDescEquality() { - PluginDesc connectorDescPluginPath = new PluginDesc<>( - Connector.class, + PluginDesc connectorDescPluginPath = new PluginDesc<>( + SinkConnector.class, snapshotVersion, + PluginType.SINK, pluginLoader ); - PluginDesc connectorDescClasspath = new PluginDesc<>( - Connector.class, + PluginDesc connectorDescClasspath = new PluginDesc<>( + SinkConnector.class, snapshotVersion, + PluginType.SINK, systemLoader ); @@ -153,12 +163,14 @@ public void testPluginDescEquality() { PluginDesc converterDescPluginPath = new PluginDesc<>( Converter.class, noVersion, + PluginType.CONVERTER, pluginLoader ); PluginDesc converterDescClasspath = new PluginDesc<>( Converter.class, noVersion, + PluginType.CONVERTER, systemLoader ); @@ -168,12 +180,14 @@ public void testPluginDescEquality() { PluginDesc transformDescPluginPath = new PluginDesc<>( Transformation.class, null, + PluginType.TRANSFORMATION, pluginLoader ); PluginDesc transformDescClasspath = new PluginDesc<>( Transformation.class, noVersion, + PluginType.TRANSFORMATION, pluginLoader ); @@ -183,15 +197,17 @@ public void testPluginDescEquality() { @SuppressWarnings("rawtypes") @Test public void testPluginDescComparison() { - PluginDesc connectorDescPluginPath = new PluginDesc<>( - Connector.class, + PluginDesc connectorDescPluginPath = new PluginDesc<>( + SinkConnector.class, regularVersion, + PluginType.SINK, pluginLoader ); - PluginDesc connectorDescClasspath = new PluginDesc<>( - Connector.class, + PluginDesc connectorDescClasspath = new PluginDesc<>( + SinkConnector.class, newerVersion, + PluginType.SINK, systemLoader ); @@ -200,12 +216,14 @@ public void testPluginDescComparison() { PluginDesc converterDescPluginPath = new PluginDesc<>( Converter.class, noVersion, + PluginType.CONVERTER, pluginLoader ); PluginDesc converterDescClasspath = new PluginDesc<>( Converter.class, snapshotVersion, + PluginType.CONVERTER, systemLoader ); @@ -214,12 +232,14 @@ public void testPluginDescComparison() { PluginDesc transformDescPluginPath = new PluginDesc<>( Transformation.class, null, + PluginType.TRANSFORMATION, pluginLoader ); PluginDesc transformDescClasspath = new PluginDesc<>( Transformation.class, regularVersion, + PluginType.TRANSFORMATION, systemLoader ); @@ -228,12 +248,14 @@ public void testPluginDescComparison() { PluginDesc predicateDescPluginPath = new PluginDesc<>( Predicate.class, regularVersion, + PluginType.PREDICATE, pluginLoader ); PluginDesc predicateDescClasspath = new PluginDesc<>( Predicate.class, regularVersion, + PluginType.PREDICATE, systemLoader ); @@ -244,13 +266,14 @@ private static void assertPluginDesc( PluginDesc desc, Class klass, String version, + PluginType type, String location ) { assertEquals(desc.pluginClass(), klass); assertEquals(desc.className(), klass.getName()); assertEquals(desc.version(), version); - assertEquals(desc.type(), PluginType.from(klass)); - assertEquals(desc.typeName(), PluginType.from(klass).toString()); + assertEquals(desc.type(), type); + assertEquals(desc.typeName(), type.toString()); assertEquals(desc.location(), location); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java index 0f830c8a8aac1..4dd168a3faf0d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java @@ -508,11 +508,11 @@ public void testPluginUrlsWithRelativeSymlinkForwards() throws Exception { @Test public void testNonCollidingAliases() { SortedSet> sinkConnectors = new TreeSet<>(); - sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader())); + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, PluginType.SINK, MockSinkConnector.class.getClassLoader())); SortedSet> sourceConnectors = new TreeSet<>(); - sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, MockSourceConnector.class.getClassLoader())); + sourceConnectors.add(new PluginDesc<>(MockSourceConnector.class, null, PluginType.SOURCE, MockSourceConnector.class.getClassLoader())); SortedSet> converters = new TreeSet<>(); - converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); + converters.add(new PluginDesc<>(CollidingConverter.class, null, PluginType.CONVERTER, CollidingConverter.class.getClassLoader())); PluginScanResult result = new PluginScanResult( sinkConnectors, sourceConnectors, @@ -540,8 +540,8 @@ public void testNonCollidingAliases() { public void testMultiVersionAlias() { SortedSet> sinkConnectors = new TreeSet<>(); // distinct versions don't cause an alias collision (the class name is the same) - sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, MockSinkConnector.class.getClassLoader())); - sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, "1.0", MockSinkConnector.class.getClassLoader())); + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, null, PluginType.SINK, MockSinkConnector.class.getClassLoader())); + sinkConnectors.add(new PluginDesc<>(MockSinkConnector.class, "1.0", PluginType.SINK, MockSinkConnector.class.getClassLoader())); assertEquals(2, sinkConnectors.size()); PluginScanResult result = new PluginScanResult( sinkConnectors, @@ -564,9 +564,9 @@ public void testMultiVersionAlias() { @Test public void testCollidingPrunedAlias() { SortedSet> converters = new TreeSet<>(); - converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); + converters.add(new PluginDesc<>(CollidingConverter.class, null, PluginType.CONVERTER, CollidingConverter.class.getClassLoader())); SortedSet> headerConverters = new TreeSet<>(); - headerConverters.add(new PluginDesc<>(CollidingHeaderConverter.class, null, CollidingHeaderConverter.class.getClassLoader())); + headerConverters.add(new PluginDesc<>(CollidingHeaderConverter.class, null, PluginType.HEADER_CONVERTER, CollidingHeaderConverter.class.getClassLoader())); PluginScanResult result = new PluginScanResult( Collections.emptySortedSet(), Collections.emptySortedSet(), @@ -589,9 +589,9 @@ public void testCollidingPrunedAlias() { @Test public void testCollidingSimpleAlias() { SortedSet> converters = new TreeSet<>(); - converters.add(new PluginDesc<>(CollidingConverter.class, null, CollidingConverter.class.getClassLoader())); + converters.add(new PluginDesc<>(CollidingConverter.class, null, PluginType.CONVERTER, CollidingConverter.class.getClassLoader())); SortedSet>> transformations = new TreeSet<>(); - transformations.add(new PluginDesc<>((Class>) (Class) Colliding.class, null, Colliding.class.getClassLoader())); + transformations.add(new PluginDesc<>((Class>) (Class) Colliding.class, null, PluginType.TRANSFORMATION, Colliding.class.getClassLoader())); PluginScanResult result = new PluginScanResult( Collections.emptySortedSet(), Collections.emptySortedSet(), diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java index 57149ca498d04..4442da3ebe75b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigValue; +import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.converters.LongConverter; @@ -37,7 +38,6 @@ import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.PluginType; import org.apache.kafka.connect.runtime.isolation.Plugins; -import org.apache.kafka.connect.runtime.isolation.ReflectionScanner; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo; @@ -118,25 +118,26 @@ public class ConnectorPluginsResourceTest { static { try { - SINK_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(VerifiableSinkConnector.class)); - SINK_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(MockSinkConnector.class)); + String appVersion = AppInfoParser.getVersion(); + SINK_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(VerifiableSinkConnector.class, appVersion, PluginType.SINK)); + SINK_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(MockSinkConnector.class, appVersion, PluginType.SINK)); - SOURCE_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(VerifiableSourceConnector.class)); - SOURCE_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(MockSourceConnector.class)); - SOURCE_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(SchemaSourceConnector.class)); - SOURCE_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(ConnectorPluginsResourceTestConnector.class)); + SOURCE_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(VerifiableSourceConnector.class, appVersion, PluginType.SOURCE)); + SOURCE_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(MockSourceConnector.class, appVersion, PluginType.SOURCE)); + SOURCE_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(SchemaSourceConnector.class, appVersion, PluginType.SOURCE)); + SOURCE_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(ConnectorPluginsResourceTestConnector.class, appVersion, PluginType.SOURCE)); - CONVERTER_PLUGINS.add(new MockConnectorPluginDesc<>(StringConverter.class)); - CONVERTER_PLUGINS.add(new MockConnectorPluginDesc<>(LongConverter.class)); + CONVERTER_PLUGINS.add(new MockConnectorPluginDesc<>(StringConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.CONVERTER)); + CONVERTER_PLUGINS.add(new MockConnectorPluginDesc<>(LongConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.CONVERTER)); - HEADER_CONVERTER_PLUGINS.add(new MockConnectorPluginDesc<>(StringConverter.class)); - HEADER_CONVERTER_PLUGINS.add(new MockConnectorPluginDesc<>(LongConverter.class)); + HEADER_CONVERTER_PLUGINS.add(new MockConnectorPluginDesc<>(StringConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.HEADER_CONVERTER)); + HEADER_CONVERTER_PLUGINS.add(new MockConnectorPluginDesc<>(LongConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.HEADER_CONVERTER)); - TRANSFORMATION_PLUGINS.add(new MockConnectorPluginDesc<>(RegexRouter.class)); - TRANSFORMATION_PLUGINS.add(new MockConnectorPluginDesc<>(TimestampConverter.Key.class)); + TRANSFORMATION_PLUGINS.add(new MockConnectorPluginDesc<>(RegexRouter.class, PluginDesc.UNDEFINED_VERSION, PluginType.TRANSFORMATION)); + TRANSFORMATION_PLUGINS.add(new MockConnectorPluginDesc<>(TimestampConverter.Key.class, PluginDesc.UNDEFINED_VERSION, PluginType.TRANSFORMATION)); - PREDICATE_PLUGINS.add(new MockConnectorPluginDesc<>(HasHeaderKey.class)); - PREDICATE_PLUGINS.add(new MockConnectorPluginDesc<>(RecordIsTombstone.class)); + PREDICATE_PLUGINS.add(new MockConnectorPluginDesc<>(HasHeaderKey.class, PluginDesc.UNDEFINED_VERSION, PluginType.PREDICATE)); + PREDICATE_PLUGINS.add(new MockConnectorPluginDesc<>(RecordIsTombstone.class, PluginDesc.UNDEFINED_VERSION, PluginType.PREDICATE)); } catch (Exception e) { fail("Failed setting up plugins"); } @@ -353,8 +354,8 @@ public void testListConnectorPlugins() { @Test public void testConnectorPluginsIncludesClassTypeAndVersionInformation() throws Exception { - PluginInfo sinkInfo = newInfo(SampleSinkConnector.class); - PluginInfo sourceInfo = newInfo(SampleSourceConnector.class); + PluginInfo sinkInfo = new PluginInfo(new MockConnectorPluginDesc<>(SampleSinkConnector.class, SampleSinkConnector.VERSION, PluginType.SINK)); + PluginInfo sourceInfo = new PluginInfo(new MockConnectorPluginDesc<>(SampleSourceConnector.class, SampleSourceConnector.VERSION, PluginType.SOURCE)); assertEquals(PluginType.SINK.toString(), sinkInfo.type()); assertEquals(PluginType.SOURCE.toString(), sourceInfo.type()); assertEquals(SampleSinkConnector.VERSION, sinkInfo.version()); @@ -425,12 +426,7 @@ public void testGetConnectorConfigDef() { } protected static PluginInfo newInfo(PluginDesc pluginDesc) { - return new PluginInfo(new MockConnectorPluginDesc<>(pluginDesc.pluginClass(), pluginDesc.version())); - } - - protected static PluginInfo newInfo(Class klass) - throws Exception { - return new PluginInfo(new MockConnectorPluginDesc<>(klass)); + return new PluginInfo(new MockConnectorPluginDesc<>(pluginDesc.pluginClass(), pluginDesc.version(), pluginDesc.type())); } public static class MockPluginClassLoader extends PluginClassLoader { @@ -446,17 +442,10 @@ public String location() { } public static class MockConnectorPluginDesc extends PluginDesc { - public MockConnectorPluginDesc(Class klass, String version) { - super(klass, version, new MockPluginClassLoader(null, new URL[0])); + public MockConnectorPluginDesc(Class klass, String version, PluginType type) { + super(klass, version, type, new MockPluginClassLoader(null, new URL[0])); } - public MockConnectorPluginDesc(Class klass) throws Exception { - super( - klass, - ReflectionScanner.versionFor(klass), - new MockPluginClassLoader(null, new URL[0]) - ); - } } /* Name here needs to be unique as we are testing the aliasing mechanism */ @@ -476,7 +465,7 @@ public static class ConnectorPluginsResourceTestConnector extends SourceConnecto @Override public String version() { - return "1.0"; + return AppInfoParser.getVersion(); } @Override From 65833f4705257f17df9c3914f5f8ff177be88771 Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Mon, 24 Jul 2023 17:40:28 -0700 Subject: [PATCH 2/9] fixup: make ReflectionScanner#versionFor private Signed-off-by: Greg Harris --- .../kafka/connect/runtime/isolation/ReflectionScanner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java index 13cb082e26a62..214297fbff19a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java @@ -69,7 +69,7 @@ public class ReflectionScanner extends PluginScanner { private static final Logger log = LoggerFactory.getLogger(ReflectionScanner.class); - public static String versionFor(Class pluginKlass) throws ReflectiveOperationException { + private static String versionFor(Class pluginKlass) throws ReflectiveOperationException { T pluginImpl = pluginKlass.getDeclaredConstructor().newInstance(); return versionFor(pluginImpl); } From 07bae745858d22236425bdc9cc590eee28ba2e37 Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Mon, 24 Jul 2023 18:08:51 -0700 Subject: [PATCH 3/9] fixup: make PluginDesc compareTo and equals consistent Signed-off-by: Greg Harris --- .../connect/runtime/isolation/PluginDesc.java | 6 ++- .../runtime/isolation/PluginDescTest.java | 45 ++++++++++++++++++- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java index 141abda79e60f..781bded044dca 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java @@ -115,6 +115,10 @@ public int compareTo(PluginDesc other) { int versionComp = encodedVersion.compareTo(other.encodedVersion); // isolated plugins appear after classpath plugins when they have identical versions. int isolatedComp = Boolean.compare(other.loader instanceof PluginClassLoader, loader instanceof PluginClassLoader); - return nameComp != 0 ? nameComp : (versionComp != 0 ? versionComp : isolatedComp); + // choose an arbitrary order between different classloaders and types + int loaderComp = loader.hashCode() - other.loader.hashCode(); + int typeComp = type.compareTo(other.type); + return nameComp != 0 ? nameComp : (versionComp != 0 ? versionComp : + (isolatedComp != 0 ? isolatedComp : (loaderComp != 0 ? loaderComp : typeComp))); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java index dff3505f5449e..52b242c23b689 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java @@ -17,14 +17,19 @@ package org.apache.kafka.connect.runtime.isolation; +import org.apache.kafka.common.config.provider.ConfigProvider; +import org.apache.kafka.common.config.provider.FileConfigProvider; +import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.predicates.Predicate; import org.junit.Before; import org.junit.Test; +import java.net.MalformedURLException; import java.net.URL; import java.nio.file.Paths; @@ -39,6 +44,7 @@ public class PluginDescTest { private final String snapshotVersion = "1.0.0-SNAPSHOT"; private final String noVersion = "undefined"; private PluginClassLoader pluginLoader; + private PluginClassLoader otherPluginLoader; @Before public void setUp() throws Exception { @@ -46,6 +52,7 @@ public void setUp() throws Exception { URL location = Paths.get("/tmp").toUri().toURL(); // Normally parent will be a DelegatingClassLoader. pluginLoader = new PluginClassLoader(location, new URL[0], systemLoader); + otherPluginLoader = new PluginClassLoader(location, new URL[0], systemLoader); } @SuppressWarnings("rawtypes") @@ -194,9 +201,9 @@ public void testPluginDescEquality() { assertNotEquals(transformDescPluginPath, transformDescClasspath); } - @SuppressWarnings("rawtypes") + @SuppressWarnings({"rawtypes", "unchecked"}) @Test - public void testPluginDescComparison() { + public void testPluginDescComparison() throws MalformedURLException { PluginDesc connectorDescPluginPath = new PluginDesc<>( SinkConnector.class, regularVersion, @@ -260,6 +267,40 @@ public void testPluginDescComparison() { ); assertNewer(predicateDescPluginPath, predicateDescClasspath); + + PluginDesc configProviderDescPluginPath = new PluginDesc<>( + FileConfigProvider.class, + regularVersion, + PluginType.CONFIGPROVIDER, + pluginLoader + ); + + PluginDesc configProviderDescOtherPluginLoader = new PluginDesc<>( + FileConfigProvider.class, + regularVersion, + PluginType.CONFIGPROVIDER, + otherPluginLoader + ); + + assertTrue("Different plugin loaders should have an ordering", + configProviderDescPluginPath.compareTo(configProviderDescOtherPluginLoader) != 0); + + + PluginDesc jsonConverterPlugin = new PluginDesc<>( + JsonConverter.class, + regularVersion, + PluginType.CONVERTER, + systemLoader + ); + + PluginDesc jsonHeaderConverterPlugin = new PluginDesc<>( + JsonConverter.class, + regularVersion, + PluginType.HEADER_CONVERTER, + systemLoader + ); + + assertNewer(jsonConverterPlugin, (PluginDesc) (PluginDesc) jsonHeaderConverterPlugin); } private static void assertPluginDesc( From 6694bf2d8ba145576c8a3692cfb7da13325936a2 Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Tue, 25 Jul 2023 09:40:11 -0700 Subject: [PATCH 4/9] fixup: review comments Signed-off-by: Greg Harris --- .../kafka/connect/runtime/isolation/PluginDesc.java | 11 +++++++---- .../connect/runtime/isolation/PluginScanner.java | 6 +++--- .../connect/runtime/isolation/ReflectionScanner.java | 4 ++-- .../connect/runtime/isolation/PluginDescTest.java | 6 +++--- 4 files changed, 15 insertions(+), 12 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java index 781bded044dca..49e85191d029f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java @@ -21,7 +21,7 @@ import java.util.Objects; -public class PluginDesc implements Comparable> { +public class PluginDesc implements Comparable> { public static final String UNDEFINED_VERSION = "undefined"; private final Class klass; private final String name; @@ -110,7 +110,7 @@ public int hashCode() { } @Override - public int compareTo(PluginDesc other) { + public int compareTo(PluginDesc other) { int nameComp = name.compareTo(other.name); int versionComp = encodedVersion.compareTo(other.encodedVersion); // isolated plugins appear after classpath plugins when they have identical versions. @@ -118,7 +118,10 @@ public int compareTo(PluginDesc other) { // choose an arbitrary order between different classloaders and types int loaderComp = loader.hashCode() - other.loader.hashCode(); int typeComp = type.compareTo(other.type); - return nameComp != 0 ? nameComp : (versionComp != 0 ? versionComp : - (isolatedComp != 0 ? isolatedComp : (loaderComp != 0 ? loaderComp : typeComp))); + return nameComp != 0 ? nameComp : + versionComp != 0 ? versionComp : + isolatedComp != 0 ? isolatedComp : + loaderComp != 0 ? loaderComp : + typeComp; } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java index be159546d2d55..acb5b668cf301 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java @@ -136,13 +136,13 @@ protected SortedSet> getServiceLoaderPluginDesc(PluginType typ pluginImpl = handleLinkageError(type, source, iterator::next); } catch (ServiceConfigurationError t) { log.error("Failed to discover {} in {}{}", - type, source.location(), reflectiveErrorDescription(t.getCause()), t); + type.simpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); continue; } Class pluginKlass = (Class) pluginImpl.getClass(); if (pluginKlass.getClassLoader() != source.loader()) { log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading", - type, pluginKlass.getClassLoader(), source.location()); + type.simpleName(), pluginKlass.getClassLoader(), source.location()); continue; } result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), type, source)); @@ -181,7 +181,7 @@ private U handleLinkageError(PluginType type, PluginSource source, Supplier< || !Objects.equals(lastError.getClass(), t.getClass()) || !Objects.equals(lastError.getMessage(), t.getMessage())) { log.error("Failed to discover {} in {}{}", - type, source.location(), reflectiveErrorDescription(t.getCause()), t); + type.simpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); } lastError = t; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java index 214297fbff19a..ad5c00c42ec6f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ReflectionScanner.java @@ -129,14 +129,14 @@ private SortedSet> getPluginDesc( } if (pluginKlass.getClassLoader() != source.loader()) { log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading", - type, pluginKlass.getClassLoader(), source.location()); + pluginKlass, pluginKlass.getClassLoader(), source.location()); continue; } try (LoaderSwap loaderSwap = withClassLoader(source.loader())) { result.add(pluginDesc(pluginKlass, versionFor(pluginKlass), type, source)); } catch (ReflectiveOperationException | LinkageError e) { log.error("Failed to discover {} in {}: Unable to instantiate {}{}", - type, source.location(), pluginKlass.getSimpleName(), + type.simpleName(), source.location(), pluginKlass.getSimpleName(), reflectiveErrorDescription(e), e); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java index 52b242c23b689..44654e109d110 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java @@ -203,7 +203,7 @@ public void testPluginDescEquality() { @SuppressWarnings({"rawtypes", "unchecked"}) @Test - public void testPluginDescComparison() throws MalformedURLException { + public void testPluginDescComparison() { PluginDesc connectorDescPluginPath = new PluginDesc<>( SinkConnector.class, regularVersion, @@ -300,7 +300,7 @@ public void testPluginDescComparison() throws MalformedURLException { systemLoader ); - assertNewer(jsonConverterPlugin, (PluginDesc) (PluginDesc) jsonHeaderConverterPlugin); + assertNewer(jsonConverterPlugin, jsonHeaderConverterPlugin); } private static void assertPluginDesc( @@ -318,7 +318,7 @@ private static void assertPluginDesc( assertEquals(desc.location(), location); } - private static void assertNewer(PluginDesc older, PluginDesc newer) { + private static void assertNewer(PluginDesc older, PluginDesc newer) { assertTrue(newer + " should be newer than " + older, older.compareTo(newer) < 0); } } From fe5c059776405e5bbb63ca5ea711996ec3cf620b Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Tue, 25 Jul 2023 09:58:34 -0700 Subject: [PATCH 5/9] fixup: remove hashCode collision by comparing locations, which are now unique. Signed-off-by: Greg Harris --- .../connect/runtime/isolation/PluginDesc.java | 4 ++-- .../runtime/isolation/PluginScanResult.java | 2 +- .../connect/runtime/isolation/PluginUtils.java | 8 ++++---- .../kafka/connect/runtime/isolation/Plugins.java | 2 +- .../connect/runtime/isolation/PluginDescTest.java | 4 ++-- .../runtime/isolation/PluginScannerTest.java | 14 +++++++------- .../connect/runtime/isolation/TestPlugins.java | 7 ++++--- 7 files changed, 21 insertions(+), 20 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java index 49e85191d029f..230945d00273e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java @@ -115,8 +115,8 @@ public int compareTo(PluginDesc other) { int versionComp = encodedVersion.compareTo(other.encodedVersion); // isolated plugins appear after classpath plugins when they have identical versions. int isolatedComp = Boolean.compare(other.loader instanceof PluginClassLoader, loader instanceof PluginClassLoader); - // choose an arbitrary order between different classloaders and types - int loaderComp = loader.hashCode() - other.loader.hashCode(); + // choose an arbitrary order between different locations and types + int loaderComp = location.compareTo(other.location); int typeComp = type.compareTo(other.type); return nameComp != 0 ? nameComp : versionComp != 0 ? versionComp : diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java index ae015ed350705..b452beb9b633c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanResult.java @@ -88,7 +88,7 @@ public PluginScanResult(List results) { ); } - private static > SortedSet merge(List results, Function> accessor) { + private static > SortedSet merge(List results, Function> accessor) { SortedSet merged = new TreeSet<>(); for (PluginScanResult element : results) { merged.addAll(accessor.apply(element)); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java index e88bfa1b03d24..77d16857ee957 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java @@ -196,12 +196,12 @@ public static boolean isClassFile(Path path) { return path.toString().toLowerCase(Locale.ROOT).endsWith(".class"); } - public static List pluginLocations(String pluginPath) { + public static Set pluginLocations(String pluginPath) { if (pluginPath == null) { - return Collections.emptyList(); + return Collections.emptySet(); } String[] pluginPathElements = COMMA_WITH_WHITESPACE.split(pluginPath.trim(), -1); - List pluginLocations = new ArrayList<>(); + Set pluginLocations = new HashSet<>(); for (String path : pluginPathElements) { try { Path pluginPathElement = Paths.get(path).toAbsolutePath(); @@ -328,7 +328,7 @@ public static List pluginUrls(Path topPath) throws IOException { return Arrays.asList(archives.toArray(new Path[0])); } - public static Set pluginSources(List pluginLocations, ClassLoader classLoader, PluginClassLoaderFactory factory) { + public static Set pluginSources(Set pluginLocations, ClassLoader classLoader, PluginClassLoaderFactory factory) { Set pluginSources = new HashSet<>(); for (Path pluginLocation : pluginLocations) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java index 83dec38a6fb22..36e20270abcd3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java @@ -63,7 +63,7 @@ public Plugins(Map props) { // VisibleForTesting Plugins(Map props, ClassLoader parent, ClassLoaderFactory factory) { String pluginPath = WorkerConfig.pluginPath(props); - List pluginLocations = PluginUtils.pluginLocations(pluginPath); + Set pluginLocations = PluginUtils.pluginLocations(pluginPath); delegatingLoader = factory.newDelegatingClassLoader(parent); Set pluginSources = PluginUtils.pluginSources(pluginLocations, delegatingLoader, factory); scanResult = initLoaders(pluginSources); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java index 44654e109d110..b4217b27ed07d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java @@ -29,7 +29,6 @@ import org.junit.Before; import org.junit.Test; -import java.net.MalformedURLException; import java.net.URL; import java.nio.file.Paths; @@ -50,9 +49,10 @@ public class PluginDescTest { public void setUp() throws Exception { // Fairly simple use case, thus no need to create a random directory here yet. URL location = Paths.get("/tmp").toUri().toURL(); + URL otherLocation = Paths.get("/tmp-other").toUri().toURL(); // Normally parent will be a DelegatingClassLoader. pluginLoader = new PluginClassLoader(location, new URL[0], systemLoader); - otherPluginLoader = new PluginClassLoader(location, new URL[0], systemLoader); + otherPluginLoader = new PluginClassLoader(otherLocation, new URL[0], systemLoader); } @SuppressWarnings("rawtypes") diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java index b9b80fa51109c..d6a85b294cf32 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java @@ -70,7 +70,7 @@ public PluginScannerTest(ScannerType scannerType) { @Test public void testScanningEmptyPluginPath() { PluginScanResult result = scan( - Collections.emptyList() + Collections.emptySet() ); assertTrue(result.isEmpty()); } @@ -91,7 +91,7 @@ public void testScanningInvalidUberJar() throws Exception { pluginDir.newFile("invalid.jar"); PluginScanResult result = scan( - Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()) + Collections.singleton(pluginDir.getRoot().toPath().toAbsolutePath()) ); assertTrue(result.isEmpty()); } @@ -102,7 +102,7 @@ public void testScanningPluginDirContainsInvalidJarsOnly() throws Exception { pluginDir.newFile("my-plugin/invalid.jar"); PluginScanResult result = scan( - Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()) + Collections.singleton(pluginDir.getRoot().toPath().toAbsolutePath()) ); assertTrue(result.isEmpty()); } @@ -110,7 +110,7 @@ public void testScanningPluginDirContainsInvalidJarsOnly() throws Exception { @Test public void testScanningNoPlugins() { PluginScanResult result = scan( - Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()) + Collections.singleton(pluginDir.getRoot().toPath().toAbsolutePath()) ); assertTrue(result.isEmpty()); } @@ -120,7 +120,7 @@ public void testScanningPluginDirEmpty() throws Exception { pluginDir.newFolder("my-plugin"); PluginScanResult result = scan( - Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()) + Collections.singleton(pluginDir.getRoot().toPath().toAbsolutePath()) ); assertTrue(result.isEmpty()); } @@ -137,7 +137,7 @@ public void testScanningMixOfValidAndInvalidPlugins() throws Exception { } PluginScanResult result = scan( - Collections.singletonList(pluginDir.getRoot().toPath().toAbsolutePath()) + Collections.singleton(pluginDir.getRoot().toPath().toAbsolutePath()) ); Set classes = new HashSet<>(); result.forEach(pluginDesc -> classes.add(pluginDesc.className())); @@ -145,7 +145,7 @@ public void testScanningMixOfValidAndInvalidPlugins() throws Exception { assertEquals(expectedClasses, classes); } - private PluginScanResult scan(List pluginLocations) { + private PluginScanResult scan(Set pluginLocations) { ClassLoaderFactory factory = new ClassLoaderFactory(); Set pluginSources = PluginUtils.pluginSources(pluginLocations, PluginScannerTest.class.getClassLoader(), factory); return scanner.discoverPlugins(pluginSources); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java index bd87dd41516bb..d93fef181aa8d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/TestPlugins.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.function.Predicate; import java.util.jar.Attributes; import java.util.jar.JarEntry; @@ -248,7 +249,7 @@ private static void assertAvailable() throws AssertionError { * @return A list of plugin jar filenames * @throws AssertionError if any plugin failed to load, or no plugins were loaded. */ - public static List pluginPath() { + public static Set pluginPath() { return pluginPath(defaultPlugins()); } @@ -262,14 +263,14 @@ public static String pluginPathJoined() { * @return A list of plugin jar filenames containing the specified test plugins * @throws AssertionError if any plugin failed to load, or no plugins were loaded. */ - public static List pluginPath(TestPlugin... plugins) { + public static Set pluginPath(TestPlugin... plugins) { assertAvailable(); return Arrays.stream(plugins) .filter(Objects::nonNull) .map(TestPlugin::resourceDir) .distinct() .map(PLUGIN_JARS::get) - .collect(Collectors.toList()); + .collect(Collectors.toSet()); } public static String pluginPathJoined(TestPlugin... plugins) { From 3546f8040878d136c505cbd951c42a9b3f64ec73 Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Tue, 25 Jul 2023 12:12:43 -0700 Subject: [PATCH 6/9] fixup: use LinkedHashSet to preserve ordering (not required but nice-to-have) Signed-off-by: Greg Harris --- .../apache/kafka/connect/runtime/isolation/PluginUtils.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java index 77d16857ee957..b77c97d1e5db5 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java @@ -35,6 +35,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Locale; @@ -201,7 +202,7 @@ public static Set pluginLocations(String pluginPath) { return Collections.emptySet(); } String[] pluginPathElements = COMMA_WITH_WHITESPACE.split(pluginPath.trim(), -1); - Set pluginLocations = new HashSet<>(); + Set pluginLocations = new LinkedHashSet<>(); for (String path : pluginPathElements) { try { Path pluginPathElement = Paths.get(path).toAbsolutePath(); @@ -329,7 +330,7 @@ public static List pluginUrls(Path topPath) throws IOException { } public static Set pluginSources(Set pluginLocations, ClassLoader classLoader, PluginClassLoaderFactory factory) { - Set pluginSources = new HashSet<>(); + Set pluginSources = new LinkedHashSet<>(); for (Path pluginLocation : pluginLocations) { try { From 149481f429e7a688501efa05f58c28b42acecfb5 Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Wed, 26 Jul 2023 09:31:28 -0700 Subject: [PATCH 7/9] fixup: avoid NPE in compareTo Signed-off-by: Greg Harris --- .../org/apache/kafka/connect/runtime/isolation/PluginDesc.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java index 230945d00273e..df17fcf3aa692 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java @@ -116,7 +116,7 @@ public int compareTo(PluginDesc other) { // isolated plugins appear after classpath plugins when they have identical versions. int isolatedComp = Boolean.compare(other.loader instanceof PluginClassLoader, loader instanceof PluginClassLoader); // choose an arbitrary order between different locations and types - int loaderComp = location.compareTo(other.location); + int loaderComp = Objects.compare(location, other.location, String::compareTo); int typeComp = type.compareTo(other.type); return nameComp != 0 ? nameComp : versionComp != 0 ? versionComp : From edd7c3cd887cf0f58a3635aae7d6b2de2b480609 Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Wed, 26 Jul 2023 09:58:00 -0700 Subject: [PATCH 8/9] fixup: eliminate nulls from PluginDesc arguments, adjust test mocking Signed-off-by: Greg Harris --- .../runtime/isolation/PluginClassLoader.java | 4 +- .../connect/runtime/isolation/PluginDesc.java | 13 ++-- .../isolation/DelegatingClassLoaderTest.java | 1 + .../runtime/isolation/PluginDescTest.java | 20 +++++ .../ConnectorPluginsResourceTest.java | 76 +++++++------------ 5 files changed, 57 insertions(+), 57 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java index da05966e13a83..d7af0ef215f36 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java @@ -55,7 +55,7 @@ public class PluginClassLoader extends URLClassLoader { */ public PluginClassLoader(URL pluginLocation, URL[] urls, ClassLoader parent) { super(urls, parent); - this.pluginLocation = pluginLocation; + this.pluginLocation = Objects.requireNonNull(pluginLocation, "Plugin location must be non-null"); } /** @@ -67,7 +67,7 @@ public PluginClassLoader(URL pluginLocation, URL[] urls, ClassLoader parent) { */ public PluginClassLoader(URL pluginLocation, URL[] urls) { super(urls); - this.pluginLocation = pluginLocation; + this.pluginLocation = Objects.requireNonNull(pluginLocation, "Plugin location must be non-null"); } /** diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java index df17fcf3aa692..395dc92d7b542 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginDesc.java @@ -33,14 +33,15 @@ public class PluginDesc implements Comparable> { private final ClassLoader loader; public PluginDesc(Class klass, String version, PluginType type, ClassLoader loader) { - this.klass = klass; - this.name = klass.getName(); + this.klass = Objects.requireNonNull(klass, "Plugin class must be non-null"); + this.name = this.klass.getName(); this.version = version != null ? version : "null"; this.encodedVersion = new DefaultArtifactVersion(this.version); - this.type = type; - this.typeName = type.toString(); + this.type = Objects.requireNonNull(type, "Plugin type must be non-null"); + this.typeName = this.type.toString(); + Objects.requireNonNull(loader, "Plugin classloader must be non-null"); this.location = loader instanceof PluginClassLoader - ? ((PluginClassLoader) loader).location() + ? Objects.requireNonNull(((PluginClassLoader) loader).location(), "Plugin location must be non-null") : "classpath"; this.loader = loader; } @@ -116,7 +117,7 @@ public int compareTo(PluginDesc other) { // isolated plugins appear after classpath plugins when they have identical versions. int isolatedComp = Boolean.compare(other.loader instanceof PluginClassLoader, loader instanceof PluginClassLoader); // choose an arbitrary order between different locations and types - int loaderComp = Objects.compare(location, other.location, String::compareTo); + int loaderComp = location.compareTo(other.location); int typeComp = type.compareTo(other.type); return nameComp != 0 ? nameComp : versionComp != 0 ? versionComp : diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java index dcbfd26b847cc..a3f2ab142dc5f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java @@ -61,6 +61,7 @@ public void setUp() { SortedSet> sinkConnectors = new TreeSet<>(); // Lie to the DCL that this arbitrary class is a connector, since all real connector classes we have access to // are forced to be non-isolated by PluginUtils.shouldLoadInIsolation. + when(pluginLoader.location()).thenReturn("some-location"); pluginDesc = new PluginDesc<>((Class) ARBITRARY_CLASS, null, PluginType.SINK, pluginLoader); assertTrue(PluginUtils.shouldLoadInIsolation(pluginDesc.className())); sinkConnectors.add(pluginDesc); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java index b4217b27ed07d..171fbde699770 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginDescTest.java @@ -34,7 +34,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class PluginDescTest { private final ClassLoader systemLoader = ClassLoader.getSystemClassLoader(); @@ -303,6 +306,23 @@ public void testPluginDescComparison() { assertNewer(jsonConverterPlugin, jsonHeaderConverterPlugin); } + @Test + public void testNullArguments() { + // Null version is acceptable + PluginDesc sink = new PluginDesc<>(SinkConnector.class, null, PluginType.SINK, systemLoader); + assertEquals("null", sink.version()); + + // Direct nulls are not acceptable for other arguments + assertThrows(NullPointerException.class, () -> new PluginDesc<>(null, regularVersion, PluginType.SINK, systemLoader)); + assertThrows(NullPointerException.class, () -> new PluginDesc<>(SinkConnector.class, regularVersion, null, systemLoader)); + assertThrows(NullPointerException.class, () -> new PluginDesc<>(SinkConnector.class, regularVersion, PluginType.SINK, null)); + + // PluginClassLoaders must have non-null locations + PluginClassLoader nullLocationLoader = mock(PluginClassLoader.class); + when(nullLocationLoader.location()).thenReturn(null); + assertThrows(NullPointerException.class, () -> new PluginDesc<>(SinkConnector.class, regularVersion, PluginType.SINK, nullLocationLoader)); + } + private static void assertPluginDesc( PluginDesc desc, Class klass, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java index 4442da3ebe75b..9e5dc55743a39 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java @@ -34,7 +34,6 @@ import org.apache.kafka.connect.runtime.SampleSinkConnector; import org.apache.kafka.connect.runtime.SampleSourceConnector; import org.apache.kafka.connect.runtime.distributed.DistributedHerder; -import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.PluginType; import org.apache.kafka.connect.runtime.isolation.Plugins; @@ -61,7 +60,6 @@ import org.mockito.ArgumentCaptor; import javax.ws.rs.BadRequestException; -import java.net.URL; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -109,36 +107,38 @@ public class ConnectorPluginsResourceTest { private static final ConfigInfos PARTIAL_CONFIG_INFOS; private static final int ERROR_COUNT = 0; private static final int PARTIAL_CONFIG_ERROR_COUNT = 1; - private static final Set> SINK_CONNECTOR_PLUGINS = new TreeSet<>(); - private static final Set> SOURCE_CONNECTOR_PLUGINS = new TreeSet<>(); - private static final Set> CONVERTER_PLUGINS = new TreeSet<>(); - private static final Set> HEADER_CONVERTER_PLUGINS = new TreeSet<>(); - private static final Set> TRANSFORMATION_PLUGINS = new TreeSet<>(); - private static final Set> PREDICATE_PLUGINS = new TreeSet<>(); + private static final Set> SINK_CONNECTOR_PLUGINS = new TreeSet<>(); + private static final Set> SOURCE_CONNECTOR_PLUGINS = new TreeSet<>(); + private static final Set> CONVERTER_PLUGINS = new TreeSet<>(); + private static final Set> HEADER_CONVERTER_PLUGINS = new TreeSet<>(); + private static final Set> TRANSFORMATION_PLUGINS = new TreeSet<>(); + private static final Set> PREDICATE_PLUGINS = new TreeSet<>(); static { try { + ClassLoader classLoader = ConnectorPluginsResourceTest.class.getClassLoader(); String appVersion = AppInfoParser.getVersion(); - SINK_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(VerifiableSinkConnector.class, appVersion, PluginType.SINK)); - SINK_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(MockSinkConnector.class, appVersion, PluginType.SINK)); + SINK_CONNECTOR_PLUGINS.add(new PluginDesc<>(VerifiableSinkConnector.class, appVersion, PluginType.SINK, classLoader)); + SINK_CONNECTOR_PLUGINS.add(new PluginDesc<>(MockSinkConnector.class, appVersion, PluginType.SINK, classLoader)); - SOURCE_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(VerifiableSourceConnector.class, appVersion, PluginType.SOURCE)); - SOURCE_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(MockSourceConnector.class, appVersion, PluginType.SOURCE)); - SOURCE_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(SchemaSourceConnector.class, appVersion, PluginType.SOURCE)); - SOURCE_CONNECTOR_PLUGINS.add(new MockConnectorPluginDesc<>(ConnectorPluginsResourceTestConnector.class, appVersion, PluginType.SOURCE)); + SOURCE_CONNECTOR_PLUGINS.add(new PluginDesc<>(VerifiableSourceConnector.class, appVersion, PluginType.SOURCE, classLoader)); + SOURCE_CONNECTOR_PLUGINS.add(new PluginDesc<>(MockSourceConnector.class, appVersion, PluginType.SOURCE, classLoader)); + SOURCE_CONNECTOR_PLUGINS.add(new PluginDesc<>(SchemaSourceConnector.class, appVersion, PluginType.SOURCE, classLoader)); + SOURCE_CONNECTOR_PLUGINS.add(new PluginDesc<>(ConnectorPluginsResourceTestConnector.class, appVersion, PluginType.SOURCE, classLoader)); - CONVERTER_PLUGINS.add(new MockConnectorPluginDesc<>(StringConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.CONVERTER)); - CONVERTER_PLUGINS.add(new MockConnectorPluginDesc<>(LongConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.CONVERTER)); + CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.CONVERTER, classLoader)); + CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.CONVERTER, classLoader)); - HEADER_CONVERTER_PLUGINS.add(new MockConnectorPluginDesc<>(StringConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.HEADER_CONVERTER)); - HEADER_CONVERTER_PLUGINS.add(new MockConnectorPluginDesc<>(LongConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.HEADER_CONVERTER)); + HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(StringConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.HEADER_CONVERTER, classLoader)); + HEADER_CONVERTER_PLUGINS.add(new PluginDesc<>(LongConverter.class, PluginDesc.UNDEFINED_VERSION, PluginType.HEADER_CONVERTER, classLoader)); - TRANSFORMATION_PLUGINS.add(new MockConnectorPluginDesc<>(RegexRouter.class, PluginDesc.UNDEFINED_VERSION, PluginType.TRANSFORMATION)); - TRANSFORMATION_PLUGINS.add(new MockConnectorPluginDesc<>(TimestampConverter.Key.class, PluginDesc.UNDEFINED_VERSION, PluginType.TRANSFORMATION)); + TRANSFORMATION_PLUGINS.add(new PluginDesc<>(RegexRouter.class, PluginDesc.UNDEFINED_VERSION, PluginType.TRANSFORMATION, classLoader)); + TRANSFORMATION_PLUGINS.add(new PluginDesc<>(TimestampConverter.Key.class, PluginDesc.UNDEFINED_VERSION, PluginType.TRANSFORMATION, classLoader)); - PREDICATE_PLUGINS.add(new MockConnectorPluginDesc<>(HasHeaderKey.class, PluginDesc.UNDEFINED_VERSION, PluginType.PREDICATE)); - PREDICATE_PLUGINS.add(new MockConnectorPluginDesc<>(RecordIsTombstone.class, PluginDesc.UNDEFINED_VERSION, PluginType.PREDICATE)); + PREDICATE_PLUGINS.add(new PluginDesc<>(HasHeaderKey.class, PluginDesc.UNDEFINED_VERSION, PluginType.PREDICATE, classLoader)); + PREDICATE_PLUGINS.add(new PluginDesc<>(RecordIsTombstone.class, PluginDesc.UNDEFINED_VERSION, PluginType.PREDICATE, classLoader)); } catch (Exception e) { + e.printStackTrace(); fail("Failed setting up plugins"); } } @@ -345,7 +345,7 @@ public void testListConnectorPlugins() { Set expectedConnectorPlugins = Stream.of(SINK_CONNECTOR_PLUGINS, SOURCE_CONNECTOR_PLUGINS) .flatMap(Collection::stream) .filter(p -> !excludes.contains(p.pluginClass())) - .map(ConnectorPluginsResourceTest::newInfo) + .map(PluginInfo::new) .collect(Collectors.toSet()); Set actualConnectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins(true)); assertEquals(expectedConnectorPlugins, actualConnectorPlugins); @@ -354,8 +354,9 @@ public void testListConnectorPlugins() { @Test public void testConnectorPluginsIncludesClassTypeAndVersionInformation() throws Exception { - PluginInfo sinkInfo = new PluginInfo(new MockConnectorPluginDesc<>(SampleSinkConnector.class, SampleSinkConnector.VERSION, PluginType.SINK)); - PluginInfo sourceInfo = new PluginInfo(new MockConnectorPluginDesc<>(SampleSourceConnector.class, SampleSourceConnector.VERSION, PluginType.SOURCE)); + ClassLoader classLoader = ConnectorPluginsResourceTest.class.getClassLoader(); + PluginInfo sinkInfo = new PluginInfo(new PluginDesc<>(SampleSinkConnector.class, SampleSinkConnector.VERSION, PluginType.SINK, classLoader)); + PluginInfo sourceInfo = new PluginInfo(new PluginDesc<>(SampleSourceConnector.class, SampleSourceConnector.VERSION, PluginType.SOURCE, classLoader)); assertEquals(PluginType.SINK.toString(), sinkInfo.type()); assertEquals(PluginType.SOURCE.toString(), sourceInfo.type()); assertEquals(SampleSinkConnector.VERSION, sinkInfo.version()); @@ -400,7 +401,7 @@ public void testListAllPlugins() { PREDICATE_PLUGINS ).flatMap(Collection::stream) .filter(p -> !excludes.contains(p.pluginClass())) - .map(ConnectorPluginsResourceTest::newInfo) + .map(PluginInfo::new) .collect(Collectors.toSet()); Set actualConnectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins(false)); assertEquals(expectedConnectorPlugins, actualConnectorPlugins); @@ -425,29 +426,6 @@ public void testGetConnectorConfigDef() { } } - protected static PluginInfo newInfo(PluginDesc pluginDesc) { - return new PluginInfo(new MockConnectorPluginDesc<>(pluginDesc.pluginClass(), pluginDesc.version(), pluginDesc.type())); - } - - public static class MockPluginClassLoader extends PluginClassLoader { - - public MockPluginClassLoader(URL pluginLocation, URL[] urls) { - super(pluginLocation, urls); - } - - @Override - public String location() { - return "/tmp/mockpath"; - } - } - - public static class MockConnectorPluginDesc extends PluginDesc { - public MockConnectorPluginDesc(Class klass, String version, PluginType type) { - super(klass, version, type, new MockPluginClassLoader(null, new URL[0])); - } - - } - /* Name here needs to be unique as we are testing the aliasing mechanism */ public static class ConnectorPluginsResourceTestConnector extends SourceConnector { From b40f7cb99ecd27f43e89824c00f1db55ef810608 Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Wed, 26 Jul 2023 10:04:09 -0700 Subject: [PATCH 9/9] fixup: remove unused PluginClassLoader constructor Signed-off-by: Greg Harris --- .../connect/runtime/isolation/PluginClassLoader.java | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java index d7af0ef215f36..825d1e6d1140e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java @@ -58,18 +58,6 @@ public PluginClassLoader(URL pluginLocation, URL[] urls, ClassLoader parent) { this.pluginLocation = Objects.requireNonNull(pluginLocation, "Plugin location must be non-null"); } - /** - * Constructor that defines the system classloader as parent of this plugin classloader. - * - * @param pluginLocation the top-level location of the plugin to be loaded in isolation by this - * classloader. - * @param urls the list of urls from which to load classes and resources for this plugin. - */ - public PluginClassLoader(URL pluginLocation, URL[] urls) { - super(urls); - this.pluginLocation = Objects.requireNonNull(pluginLocation, "Plugin location must be non-null"); - } - /** * Returns the top-level location of the classes and dependencies required by the plugin that * is loaded by this classloader.