Skip to content

Commit

Permalink
add schema serde
Browse files Browse the repository at this point in the history
  • Loading branch information
MatrixHB committed Sep 3, 2022
1 parent cea2d6c commit 53f755c
Show file tree
Hide file tree
Showing 26 changed files with 539 additions and 270 deletions.
5 changes: 5 additions & 0 deletions rocketmq-streams-channel-rocketmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>schema-registry-client</artifactId>
<version>0.0.4-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.apache.rocketmq.streams.schema;

import java.util.HashMap;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
import org.apache.rocketmq.schema.registry.client.config.AvroSerializerConfig;
import org.apache.rocketmq.schema.registry.client.serde.avro.SpecificAvroSerde;

public class AvroSchemaWrapper implements SchemaWrapper {

private static final Log LOG = LogFactory.getLog(AvroSchemaWrapper.class);

private SpecificAvroSerde avroSerde;

public AvroSchemaWrapper(SchemaConfig schemaConfig) {
try {
if (schemaConfig.getSchemaRegistryUrl() == null) {
avroSerde = new SpecificAvroSerde();
Map<String, Object> configs = new HashMap<>();
configs.put(AvroSerializerConfig.DESERIALIZE_TARGET_TYPE, Class.forName(schemaConfig.getClassName()));
configs.put(AvroSerializerConfig.SKIP_SCHEMA_REGISTRY, true);
avroSerde.configure(configs);
} else {
SchemaRegistryClient schemaRegistryClient =
SchemaRegistryClientFactory.newClient(schemaConfig.getSchemaRegistryUrl(), null);
avroSerde = new SpecificAvroSerde(schemaRegistryClient);
Map<String, Object> configs = new HashMap<>();
configs.put(AvroSerializerConfig.DESERIALIZE_TARGET_TYPE, Class.forName(schemaConfig.getClassName()));
avroSerde.configure(configs);
}
} catch (Exception e) {
LOG.error("init AvroSchemaWrapper failed, " + schemaConfig.toString(), e);
throw new RuntimeException("init AvroSchemaWrapper failed");
}
}

@Override
public Object deserialize(MessageExt messageExt) {
String subject = messageExt.getTopic();
byte[] msgBody = messageExt.getBody();
return avroSerde.deserializer().deserialize(subject, msgBody);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.apache.rocketmq.streams.schema;

import java.util.HashMap;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClient;
import org.apache.rocketmq.schema.registry.client.SchemaRegistryClientFactory;
import org.apache.rocketmq.schema.registry.client.config.JsonSerializerConfig;
import org.apache.rocketmq.schema.registry.client.serde.json.JsonSerde;

/**
* @author huitong
*/
public class JsonSchemaWrapper implements SchemaWrapper {

private static final Log LOG = LogFactory.getLog(JsonSchemaWrapper.class);

private JsonSerde jsonSerde;

public JsonSchemaWrapper(SchemaConfig schemaConfig) {
try {
if (schemaConfig.getSchemaRegistryUrl() == null) {
jsonSerde = new JsonSerde();
Map<String, Object> configs = new HashMap<>();
configs.put(JsonSerializerConfig.DESERIALIZE_TARGET_TYPE, Class.forName(schemaConfig.getClassName()));
configs.put(JsonSerializerConfig.SKIP_SCHEMA_REGISTRY, true);
jsonSerde.configure(configs);
} else {
SchemaRegistryClient schemaRegistryClient =
SchemaRegistryClientFactory.newClient(schemaConfig.getSchemaRegistryUrl(), null);
jsonSerde = new JsonSerde(schemaRegistryClient);
Map<String, Object> configs = new HashMap<>();
configs.put(JsonSerializerConfig.DESERIALIZE_TARGET_TYPE, Class.forName(schemaConfig.getClassName()));
jsonSerde.configure(configs);
}
} catch (Exception e) {
LOG.error("init AvroSchemaWrapper failed, " + schemaConfig.toString(), e);
throw new RuntimeException("init AvroSchemaWrapper failed");
}
}

@Override
public Object deserialize(MessageExt messageExt) {

String subject = messageExt.getTopic();
byte[] msgBody = messageExt.getBody();
return jsonSerde.deserializer().deserialize(subject, msgBody);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package org.apache.rocketmq.streams.schema;

import java.io.Serializable;

public class SchemaConfig implements Serializable {

/**
* json, avro, protobuf, kyro, thrift, etc.
*/
private String schemaType;

/**
* allowed to be null, if null means skip schema registry
*/
private String schemaRegistryUrl;

/**
* deserialize target class
*/
private String className;

public SchemaConfig() {
}

public SchemaConfig(SchemaType schemaType, Class targetClass) {
this.schemaType = schemaType.name();
this.className = targetClass.getName();
}

public SchemaConfig(SchemaType schemaType, Class targetClass, String schemaRegistryUrl) {
this.schemaType = schemaType.name();
this.schemaRegistryUrl = schemaRegistryUrl;
this.className = targetClass.getName();
}

public String getSchemaType() {
return schemaType;
}

public void setSchemaType(String schemaType) {
this.schemaType = schemaType;
}

public String getSchemaRegistryUrl() {
return schemaRegistryUrl;
}

public void setSchemaRegistryUrl(String schemaRegistryUrl) {
this.schemaRegistryUrl = schemaRegistryUrl;
}

public String getClassName() {
return className;
}

public void setClassName(String className) {
this.className = className;
}

@Override
public String toString() {
return "SchemaConfig{" +
"schemaType='" + schemaType + '\'' +
", schemaRegistryUrl='" + schemaRegistryUrl + '\'' +
", className='" + className + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.apache.rocketmq.streams.schema;

public enum SchemaType {

/**
* Avro type
*/
AVRO("AVRO"),
/**
* Protobuf type
*/
PROTOBUF("PROTOBUF"),
/**
* Thrift type
*/
THRIFT("THRIFT"),
/**
* Json type
*/
JSON("JSON"),
/**
* Text type for reserved
*/
TEXT("TEXT"),
/**
* Binlog type for reserved
*/
BINLOG("BINLOG");

private final String value;

SchemaType(final String value) {
this.value = value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.apache.rocketmq.streams.schema;

import org.apache.rocketmq.common.message.MessageExt;

public interface SchemaWrapper {

Object deserialize(MessageExt messageExt);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.apache.rocketmq.streams.schema;

import java.util.HashMap;
import java.util.Map;

public class SchemaWrapperFactory {

private static Map<String /*topic*/, SchemaWrapper> schemaWrapperCache = new HashMap<>();

public static SchemaWrapper createIfAbsent(String topic, SchemaConfig schemaConfig) {
SchemaWrapper schemaWrapper = schemaWrapperCache.get(topic);
if (schemaWrapper != null) {
return schemaWrapper;
}
if (SchemaType.JSON.name().equals(schemaConfig.getSchemaType())) {
JsonSchemaWrapper jsonSchemaWrapper = new JsonSchemaWrapper(schemaConfig);
schemaWrapperCache.putIfAbsent(topic, jsonSchemaWrapper);
return jsonSchemaWrapper;
} else if (SchemaType.AVRO.name().equals(schemaConfig.getSchemaType())) {
AvroSchemaWrapper avroSchemaWrapper = new AvroSchemaWrapper(schemaConfig);
schemaWrapperCache.putIfAbsent(topic, avroSchemaWrapper);
return avroSchemaWrapper;
} else {
throw new RuntimeException("scheme type " + schemaConfig.getSchemaType() + " not supported");
}
}

}
Loading

0 comments on commit 53f755c

Please sign in to comment.