Skip to content

Commit

Permalink
[improve][io]: Add validation for JDBC sink not supporting primitive …
Browse files Browse the repository at this point in the history
…schema (apache#22376)

(cherry picked from commit a503efc)
  • Loading branch information
shibd authored and srinath-ctds committed Apr 23, 2024
1 parent a5b9be3 commit fd4c9c6
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 9 deletions.
7 changes: 7 additions & 0 deletions pulsar-io/jdbc/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-original</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;

Expand Down Expand Up @@ -137,6 +138,10 @@ public Mutation createMutation(Record<GenericObject> message) {
}
recordValueGetter = (k) -> data.get(k);
} else {
SchemaType schemaType = message.getSchema().getSchemaInfo().getType();
if (schemaType.isPrimitive()) {
throw new UnsupportedOperationException("Primitive schema is not supported: " + schemaType);
}
recordValueGetter = (key) -> ((GenericRecord) record).getField(key);
}
String action = message.getProperties().get(ACTION_PROPERTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.util.Utf8;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.functions.api.Record;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -143,5 +147,26 @@ private Schema createFieldAndGetSchema(Function<SchemaBuilder.FieldAssembler<Sch
return consumer.apply(record).endRecord().getFields().get(0).schema();
}

@Test(expectedExceptions = UnsupportedOperationException.class,
expectedExceptionsMessageRegExp = "Primitive schema is not supported.*")
@SuppressWarnings("unchecked")
public void testNotSupportPrimitiveSchema() {
BaseJdbcAutoSchemaSink baseJdbcAutoSchemaSink = new BaseJdbcAutoSchemaSink() {};
AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
autoConsumeSchema.setSchema(org.apache.pulsar.client.api.Schema.STRING);
Record<? extends GenericObject> record = new Record<GenericRecord>() {
@Override
public org.apache.pulsar.client.api.Schema<GenericRecord> getSchema() {
return autoConsumeSchema;
}

@Override
public GenericRecord getValue() {
return null;
}
};
baseJdbcAutoSchemaSink.createMutation((Record<GenericObject>) record);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.KeyValue;
Expand Down Expand Up @@ -282,20 +283,24 @@ public void TestUnknownAction() throws Exception {
}

@Test
@SuppressWarnings("unchecked")
public void TestUpdateAction() throws Exception {

AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
autoConsumeSchema.setSchema(schema);

Foo updateObj = new Foo();
updateObj.setField1("ValueOfField3");
updateObj.setField2("ValueOfField3");
updateObj.setField3(4);

byte[] updateBytes = schema.encode(updateObj);
Message<GenericObject> updateMessage = mock(MessageImpl.class);
Message<GenericRecord> updateMessage = mock(MessageImpl.class);
CompletableFuture<Boolean> future = new CompletableFuture<>();
Record<GenericObject> updateRecord = PulsarRecord.<GenericObject>builder()
Record<? extends GenericObject> updateRecord = PulsarRecord.<GenericRecord>builder()
.message(updateMessage)
.schema(autoConsumeSchema)
.topicName("fake_topic_name")
.ackFunction(() -> future.complete(null))
.build();
Expand All @@ -312,7 +317,7 @@ public void TestUpdateAction() throws Exception {
updateMessage.getValue().toString(),
updateRecord.getValue().toString());

jdbcSink.write(updateRecord);
jdbcSink.write((Record<GenericObject>) updateRecord);
future.get(1, TimeUnit.SECONDS);

// value has been written to db, read it out and verify.
Expand All @@ -325,18 +330,22 @@ public void TestUpdateAction() throws Exception {
}

@Test
@SuppressWarnings("unchecked")
public void TestDeleteAction() throws Exception {

AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
autoConsumeSchema.setSchema(schema);

Foo deleteObj = new Foo();
deleteObj.setField3(5);

byte[] deleteBytes = schema.encode(deleteObj);
Message<GenericObject> deleteMessage = mock(MessageImpl.class);
Message<GenericRecord> deleteMessage = mock(MessageImpl.class);
CompletableFuture<Boolean> future = new CompletableFuture<>();
Record<GenericObject> deleteRecord = PulsarRecord.<GenericObject>builder()
Record<? extends GenericObject> deleteRecord = PulsarRecord.<GenericRecord>builder()
.message(deleteMessage)
.schema(autoConsumeSchema)
.topicName("fake_topic_name")
.ackFunction(() -> future.complete(null))
.build();
Expand All @@ -352,7 +361,7 @@ public void TestDeleteAction() throws Exception {
deleteMessage.getValue().toString(),
deleteRecord.getValue().toString());

jdbcSink.write(deleteRecord);
jdbcSink.write((Record<GenericObject>) deleteRecord);
future.get(1, TimeUnit.SECONDS);

// value has been written to db, read it out and verify.
Expand Down Expand Up @@ -848,25 +857,29 @@ public void testNullValueAction(NullValueActionTestConfig config) throws Excepti
}
}

@SuppressWarnings("unchecked")
private Record<GenericObject> createMockFooRecord(Foo record, Map<String, String> actionProperties,
CompletableFuture<Boolean> future) {
Message<GenericObject> insertMessage = mock(MessageImpl.class);
Message<GenericRecord> insertMessage = mock(MessageImpl.class);
GenericSchema<GenericRecord> genericAvroSchema;
AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(true).build());
AutoConsumeSchema autoConsumeSchema = new AutoConsumeSchema();
autoConsumeSchema.setSchema(schema);

byte[] insertBytes = schema.encode(record);

Record<GenericObject> insertRecord = PulsarRecord.<GenericObject>builder()
Record<? extends GenericObject> insertRecord = PulsarRecord.<GenericRecord>builder()
.message(insertMessage)
.topicName("fake_topic_name")
.schema(autoConsumeSchema)
.ackFunction(() -> future.complete(true))
.failFunction(() -> future.complete(false))
.build();

genericAvroSchema = new GenericAvroSchema(schema.getSchemaInfo());
when(insertMessage.getValue()).thenReturn(genericAvroSchema.decode(insertBytes));
when(insertMessage.getProperties()).thenReturn(actionProperties);
return insertRecord;
return (Record<GenericObject>) insertRecord;
}

}

0 comments on commit fd4c9c6

Please sign in to comment.