From fd4c9c6c8ec1e7a027f8145a5f443835a6f0d125 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 29 Mar 2024 08:33:27 +0800 Subject: [PATCH] [improve][io]: Add validation for JDBC sink not supporting primitive schema (#22376) (cherry picked from commit a503efc826e60b8e26f9792aeb45223374b8f4ca) --- pulsar-io/jdbc/core/pom.xml | 7 +++++ .../io/jdbc/BaseJdbcAutoSchemaSink.java | 5 +++ .../io/jdbc/BaseJdbcAutoSchemaSinkTest.java | 25 +++++++++++++++ .../pulsar/io/jdbc/SqliteJdbcSinkTest.java | 31 +++++++++++++------ 4 files changed, 59 insertions(+), 9 deletions(-) diff --git a/pulsar-io/jdbc/core/pom.xml b/pulsar-io/jdbc/core/pom.xml index 5c0adf6be27de..b0e5a3d0b3e8d 100644 --- a/pulsar-io/jdbc/core/pom.xml +++ b/pulsar-io/jdbc/core/pom.xml @@ -71,6 +71,13 @@ provided + + ${project.groupId} + pulsar-client-original + ${project.version} + test + + \ No newline at end of file diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java index 36c3674091932..3655688c0f3ad 100644 --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java +++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSink.java @@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.KeyValueSchema; import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId; @@ -137,6 +138,10 @@ public Mutation createMutation(Record message) { } recordValueGetter = (k) -> data.get(k); } else { + SchemaType schemaType = message.getSchema().getSchemaInfo().getType(); + if (schemaType.isPrimitive()) { + throw new UnsupportedOperationException("Primitive schema is not supported: " + schemaType); + } recordValueGetter = (key) -> ((GenericRecord) record).getField(key); } String action = message.getProperties().get(ACTION_PROPERTY); diff --git a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java index b15eb832242c7..c088dd3c42c32 100644 --- a/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java +++ b/pulsar-io/jdbc/core/src/test/java/org/apache/pulsar/io/jdbc/BaseJdbcAutoSchemaSinkTest.java @@ -22,6 +22,10 @@ import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.util.Utf8; +import org.apache.pulsar.client.api.schema.GenericObject; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; +import org.apache.pulsar.functions.api.Record; import org.testng.Assert; import org.testng.annotations.Test; @@ -143,5 +147,26 @@ private Schema createFieldAndGetSchema(Function record = new Record() { + @Override + public org.apache.pulsar.client.api.Schema getSchema() { + return autoConsumeSchema; + } + + @Override + public GenericRecord getValue() { + return null; + } + }; + baseJdbcAutoSchemaSink.createMutation((Record) record); + } + } \ No newline at end of file diff --git a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java index d9ed4cbd442bf..ca01615bef193 100644 --- a/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java +++ b/pulsar-io/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java @@ -48,6 +48,7 @@ import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; import org.apache.pulsar.common.schema.KeyValue; @@ -282,9 +283,12 @@ public void TestUnknownAction() throws Exception { } @Test + @SuppressWarnings("unchecked") public void TestUpdateAction() throws Exception { AvroSchema schema = AvroSchema.of(SchemaDefinition.builder().withPojo(Foo.class).build()); + AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema(); + autoConsumeSchema.setSchema(schema); Foo updateObj = new Foo(); updateObj.setField1("ValueOfField3"); @@ -292,10 +296,11 @@ public void TestUpdateAction() throws Exception { updateObj.setField3(4); byte[] updateBytes = schema.encode(updateObj); - Message updateMessage = mock(MessageImpl.class); + Message updateMessage = mock(MessageImpl.class); CompletableFuture future = new CompletableFuture<>(); - Record updateRecord = PulsarRecord.builder() + Record updateRecord = PulsarRecord.builder() .message(updateMessage) + .schema(autoConsumeSchema) .topicName("fake_topic_name") .ackFunction(() -> future.complete(null)) .build(); @@ -312,7 +317,7 @@ public void TestUpdateAction() throws Exception { updateMessage.getValue().toString(), updateRecord.getValue().toString()); - jdbcSink.write(updateRecord); + jdbcSink.write((Record) updateRecord); future.get(1, TimeUnit.SECONDS); // value has been written to db, read it out and verify. @@ -325,18 +330,22 @@ public void TestUpdateAction() throws Exception { } @Test + @SuppressWarnings("unchecked") public void TestDeleteAction() throws Exception { AvroSchema schema = AvroSchema.of(SchemaDefinition.builder().withPojo(Foo.class).build()); + AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema(); + autoConsumeSchema.setSchema(schema); Foo deleteObj = new Foo(); deleteObj.setField3(5); byte[] deleteBytes = schema.encode(deleteObj); - Message deleteMessage = mock(MessageImpl.class); + Message deleteMessage = mock(MessageImpl.class); CompletableFuture future = new CompletableFuture<>(); - Record deleteRecord = PulsarRecord.builder() + Record deleteRecord = PulsarRecord.builder() .message(deleteMessage) + .schema(autoConsumeSchema) .topicName("fake_topic_name") .ackFunction(() -> future.complete(null)) .build(); @@ -352,7 +361,7 @@ public void TestDeleteAction() throws Exception { deleteMessage.getValue().toString(), deleteRecord.getValue().toString()); - jdbcSink.write(deleteRecord); + jdbcSink.write((Record) deleteRecord); future.get(1, TimeUnit.SECONDS); // value has been written to db, read it out and verify. @@ -848,17 +857,21 @@ public void testNullValueAction(NullValueActionTestConfig config) throws Excepti } } + @SuppressWarnings("unchecked") private Record createMockFooRecord(Foo record, Map actionProperties, CompletableFuture future) { - Message insertMessage = mock(MessageImpl.class); + Message insertMessage = mock(MessageImpl.class); GenericSchema genericAvroSchema; AvroSchema schema = AvroSchema.of(SchemaDefinition.builder().withPojo(Foo.class).withAlwaysAllowNull(true).build()); + AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema(); + autoConsumeSchema.setSchema(schema); byte[] insertBytes = schema.encode(record); - Record insertRecord = PulsarRecord.builder() + Record insertRecord = PulsarRecord.builder() .message(insertMessage) .topicName("fake_topic_name") + .schema(autoConsumeSchema) .ackFunction(() -> future.complete(true)) .failFunction(() -> future.complete(false)) .build(); @@ -866,7 +879,7 @@ private Record createMockFooRecord(Foo record, Map) insertRecord; } }