diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java index b136b3e3e057..6777be50ab50 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java @@ -214,7 +214,7 @@ protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { - config.validate(); + config.validate(jdbcType); // If we define a partition column, we follow a different route. @Nullable String partitionColumn = config.getPartitionColumn(); @Nullable String location = config.getLocation(); diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java index 0837ea686ddf..23decba1a633 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProviderTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; +import com.google.auto.service.AutoService; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -47,6 +48,25 @@ @RunWith(JUnit4.class) public class JdbcReadSchemaTransformProviderTest { + @AutoService(SchemaTransformProvider.class) + public static class ReadFromDerbySchemaTransformProvider extends JdbcReadSchemaTransformProvider { + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:derby_read:v1"; + } + + @Override + public String description() { + return inheritedDescription("Derby", "ReadFromDerby", "derby", 5432); + } + + @Override + protected String jdbcType() { + return "derby"; + } + } + private static final JdbcIO.DataSourceConfiguration DATA_SOURCE_CONFIGURATION = JdbcIO.DataSourceConfiguration.create( "org.apache.derby.jdbc.EmbeddedDriver", "jdbc:derby:memory:testDB;create=true"); @@ -199,6 +219,32 @@ public void testReadWithJdbcTypeSpecified() { pipeline.run(); } + @Test + public void testReadWithJdbcDerbyTransformTypeSpecified() { + JdbcReadSchemaTransformProvider provider = null; + for (SchemaTransformProvider p : ServiceLoader.load(SchemaTransformProvider.class)) { + if (p instanceof ReadFromDerbySchemaTransformProvider) { + provider = (JdbcReadSchemaTransformProvider) p; + break; + } + } + assertNotNull(provider); + + PCollection output = + PCollectionRowTuple.empty(pipeline) + .apply( + provider.from( + ReadFromDerbySchemaTransformProvider.JdbcReadSchemaTransformConfiguration + .builder() + .setJdbcUrl(DATA_SOURCE_CONFIGURATION.getUrl().get()) + .setLocation(READ_TABLE_NAME) + .build())) + .get("output"); + Long expected = Long.valueOf(EXPECTED_ROW_COUNT); + PAssert.that(output.apply(Count.globally())).containsInAnyOrder(expected); + pipeline.run(); + } + @Test public void testReadWithPartitions() { JdbcReadSchemaTransformProvider provider = null;