diff --git a/external/storm-avro/README.md b/external/storm-avro/README.md new file mode 100644 index 00000000000..ed28e4f3c41 --- /dev/null +++ b/external/storm-avro/README.md @@ -0,0 +1,77 @@ +#Storm Avro + +Storm integration for [Apache Avro](http://avro.apache.org/). + +## GenericAvroSerializer & FixedAvroSerializer & ConfluentAvroSerializer +These Serializers are the implementations of `AbstractAvroSerializer`. +To serialize Avro GenericRecord between worker and another worker you **must** register the appropriate Kryo serializers with your topology configuration. A convenience +method is provided for this: + +`AvroUtils.addAvroKryoSerializations(conf);` + +By default Storm will use the `GenericAvroSerializer` to handle serialization. This will work, but there are much faster options available if you can pre-define the schemas you will be using or utilize an external schema registry. An implementation using the Confluent Schema Registry is provided, but others can be implemented and provided to Storm. +The configuration property is `Config.TOPOLOGY_AVRO_SERIALIZER`. + +Please see the javadoc for classes in org.apache.storm.avro for information about using the built-in options or creating your own. + + +### AvroSchemaRegistry + +```java +public interface AvroSchemaRegistry extends Serializable { + String getFingerprint(Schema schema); + + Schema getSchema(String fingerPrint); +} +``` + +### AbstractAvroSerializer + +```java +public abstract class AbstractAvroSerializer extends Serializer implements AvroSchemaRegistry { + + @Override + public void write(Kryo kryo, Output output, GenericContainer record) { } + + @Override + public GenericContainer read(Kryo kryo, Input input, Class aClass) { } +} +``` + +## DirectAvroSerializer + +`DirectAvroSerializer` provide the ability to serialize Avro `GenericContainer` directly. + +```java +public interface DirectAvroSerializer extends Serializable { + + public byte[] serialize(GenericContainer record) throws IOException; + + public GenericContainer deserialize(byte[] bytes, Schema schema) throws IOException; + +} +``` + +## License + +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. + +## Committer Sponsors + + * Aaron Niskode-Dossett ([dossett@gmail.com](mailto:dossett@gmail.com)) + diff --git a/external/storm-avro/pom.xml b/external/storm-avro/pom.xml new file mode 100644 index 00000000000..33e38263167 --- /dev/null +++ b/external/storm-avro/pom.xml @@ -0,0 +1,97 @@ + + + + 4.0.0 + + + storm + org.apache.storm + 2.0.0-SNAPSHOT + ../../pom.xml + + + storm-avro + + + + dossett + Aaron Niskode-Dossett + dossett@gmail.com + + + vesense + Xin Wang + data.xinwang@gmail.com + + + + + + confluent + http://packages.confluent.io/maven + + + + + + org.apache.storm + storm-core + ${project.version} + provided + + + org.apache.avro + avro + 1.7.7 + + + io.confluent + kafka-avro-serializer + 1.0 + + + org.slf4j + slf4j-log4j12 + + + + + com.google.guava + guava + + + commons-lang + commons-lang + + + commons-codec + commons-codec + + + + junit + junit + test + + + org.mockito + mockito-all + test + + + diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java b/external/storm-avro/src/main/java/org/apache/storm/avro/AbstractAvroSerializer.java similarity index 98% rename from external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java rename to external/storm-avro/src/main/java/org/apache/storm/avro/AbstractAvroSerializer.java index ddf015d2e24..4e8f7ef35f4 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AbstractAvroSerializer.java +++ b/external/storm-avro/src/main/java/org/apache/storm/avro/AbstractAvroSerializer.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.hdfs.avro; +package org.apache.storm.avro; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroSchemaRegistry.java b/external/storm-avro/src/main/java/org/apache/storm/avro/AvroSchemaRegistry.java similarity index 96% rename from external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroSchemaRegistry.java rename to external/storm-avro/src/main/java/org/apache/storm/avro/AvroSchemaRegistry.java index 0d1dc8bb799..d35d2f059a9 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroSchemaRegistry.java +++ b/external/storm-avro/src/main/java/org/apache/storm/avro/AvroSchemaRegistry.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.hdfs.avro; +package org.apache.storm.avro; import org.apache.avro.Schema; diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroUtils.java b/external/storm-avro/src/main/java/org/apache/storm/avro/AvroUtils.java similarity index 87% rename from external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroUtils.java rename to external/storm-avro/src/main/java/org/apache/storm/avro/AvroUtils.java index 5549291c0b0..b49e26585b3 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/AvroUtils.java +++ b/external/storm-avro/src/main/java/org/apache/storm/avro/AvroUtils.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.hdfs.avro; +package org.apache.storm.avro; import org.apache.avro.generic.GenericData; import org.apache.storm.Config; @@ -25,15 +25,15 @@ public class AvroUtils { * A helper method to extract avro serialization configurations from the topology configuration and register * specific kryo serializers as necessary. A default serializer will be provided if none is specified in the * configuration. "avro.serializer" should specify the complete class name of the serializer, e.g. - * "org.apache.stgorm.hdfs.avro.GenericAvroSerializer" + * "org.apache.stgorm.avro.GenericAvroSerializer" * * @param conf The topology configuration * @throws ClassNotFoundException If the specified serializer cannot be located. */ public static void addAvroKryoSerializations(Config conf) throws ClassNotFoundException { final Class serializerClass; - if (conf.containsKey("avro.serializer")) { - serializerClass = Class.forName((String)conf.get("avro.serializer")); + if (conf.containsKey(Config.TOPOLOGY_AVRO_SERIALIZER)) { + serializerClass = Class.forName((String)conf.get(Config.TOPOLOGY_AVRO_SERIALIZER)); } else { serializerClass = GenericAvroSerializer.class; diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java b/external/storm-avro/src/main/java/org/apache/storm/avro/ConfluentAvroSerializer.java similarity index 93% rename from external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java rename to external/storm-avro/src/main/java/org/apache/storm/avro/ConfluentAvroSerializer.java index 2008a3e8729..c43ab179e27 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java +++ b/external/storm-avro/src/main/java/org/apache/storm/avro/ConfluentAvroSerializer.java @@ -15,13 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.hdfs.avro; +package org.apache.storm.avro; import com.esotericsoftware.kryo.Kryo; + import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; + import org.apache.avro.Schema; +import org.apache.storm.Config; import java.io.IOException; import java.util.Map; @@ -50,11 +53,11 @@ public class ConfluentAvroSerializer extends AbstractAvroSerializer { * See Storm's SerializationFactory class for details * * @param k Unused but needs to be present for Serialization Factory to find this constructor - * @param stormConf The global storm configuration. Must define "avro.schemaregistry.confluent" to locate the + * @param stormConf The global storm configuration. Must define "topology.avro.confluent.schema.registry.url" to locate the * confluent schema registry. Should in the form of "http://HOST:PORT" */ public ConfluentAvroSerializer(Kryo k, Map stormConf) { - url = (String) stormConf.get("avro.schemaregistry.confluent"); + url = (String) stormConf.get(Config.TOPOLOGY_AVRO_CONFLUENT_SCHEMA_REGISTRY_URL); this.theClient = new CachedSchemaRegistryClient(this.url, 10000); } diff --git a/external/storm-avro/src/main/java/org/apache/storm/avro/DefaultDirectAvroSerializer.java b/external/storm-avro/src/main/java/org/apache/storm/avro/DefaultDirectAvroSerializer.java new file mode 100644 index 00000000000..81307ed12ec --- /dev/null +++ b/external/storm-avro/src/main/java/org/apache/storm/avro/DefaultDirectAvroSerializer.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.avro; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericContainer; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; + +public class DefaultDirectAvroSerializer implements DirectAvroSerializer { + + @Override + public byte[] serialize(GenericContainer record) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DatumWriter writer = new GenericDatumWriter( + record.getSchema()); + Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null); + writer.write(record, encoder); + encoder.flush(); + byte[] bytes = out.toByteArray(); + out.close(); + return bytes; + } + + @Override + public GenericContainer deserialize(byte[] bytes, Schema schema) + throws IOException { + DatumReader reader = new GenericDatumReader( + schema); + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, null); + GenericContainer record = reader.read(null, decoder); + return record; + } + +} diff --git a/external/storm-avro/src/main/java/org/apache/storm/avro/DirectAvroSerializer.java b/external/storm-avro/src/main/java/org/apache/storm/avro/DirectAvroSerializer.java new file mode 100644 index 00000000000..c6daed08cdb --- /dev/null +++ b/external/storm-avro/src/main/java/org/apache/storm/avro/DirectAvroSerializer.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.avro; + +import java.io.IOException; +import java.io.Serializable; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericContainer; + +public interface DirectAvroSerializer extends Serializable { + + public byte[] serialize(GenericContainer record) throws IOException; + + public GenericContainer deserialize(byte[] bytes, Schema schema) throws IOException; + +} diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java b/external/storm-avro/src/main/java/org/apache/storm/avro/FixedAvroSerializer.java similarity index 98% rename from external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java rename to external/storm-avro/src/main/java/org/apache/storm/avro/FixedAvroSerializer.java index 4dd5fdcfce9..c13e7caa1dd 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/FixedAvroSerializer.java +++ b/external/storm-avro/src/main/java/org/apache/storm/avro/FixedAvroSerializer.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.hdfs.avro; +package org.apache.storm.avro; import org.apache.avro.Schema; import org.apache.avro.SchemaNormalization; diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java b/external/storm-avro/src/main/java/org/apache/storm/avro/GenericAvroSerializer.java similarity index 97% rename from external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java rename to external/storm-avro/src/main/java/org/apache/storm/avro/GenericAvroSerializer.java index ecf8c491761..48d185976fe 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/GenericAvroSerializer.java +++ b/external/storm-avro/src/main/java/org/apache/storm/avro/GenericAvroSerializer.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.hdfs.avro; +package org.apache.storm.avro; import org.apache.avro.Schema; diff --git a/external/storm-avro/src/test/java/org/apache/storm/avro/TestDirectAvroSerializer.java b/external/storm-avro/src/test/java/org/apache/storm/avro/TestDirectAvroSerializer.java new file mode 100644 index 00000000000..86234694acb --- /dev/null +++ b/external/storm-avro/src/test/java/org/apache/storm/avro/TestDirectAvroSerializer.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.avro; + +import java.io.IOException; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.junit.Assert; +import org.junit.Test; + +public class TestDirectAvroSerializer { + private static final String schemaString1 = "{\"type\":\"record\"," + + "\"name\":\"stormtest1\"," + + "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," + + "{ \"name\":\"int1\", \"type\":\"int\" }]}"; + private static final Schema schema1; + + DirectAvroSerializer ser = new DefaultDirectAvroSerializer(); + + static { + Schema.Parser parser = new Schema.Parser(); + schema1 = parser.parse(schemaString1); + } + + @Test + public void testSchemas() { + GenericRecord record = new GenericData.Record(schema1); + record.put("foo1", "xin"); + record.put("int1", 2016); + try { + Assert.assertEquals(record, ser.deserialize(ser.serialize(record), schema1)); + } catch (IOException e) { + e.printStackTrace(); + } + } + +} diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java b/external/storm-avro/src/test/java/org/apache/storm/avro/TestFixedAvroSerializer.java similarity index 96% rename from external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java rename to external/storm-avro/src/test/java/org/apache/storm/avro/TestFixedAvroSerializer.java index a584f911284..9f1b2c0a9b4 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestFixedAvroSerializer.java +++ b/external/storm-avro/src/test/java/org/apache/storm/avro/TestFixedAvroSerializer.java @@ -15,15 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.hdfs.avro; +package org.apache.storm.avro; import org.apache.avro.Schema; import org.junit.Assert; import org.junit.Test; -import java.util.ArrayList; -import java.util.List; - public class TestFixedAvroSerializer { //These should match FixedAvroSerializer.config in the test resources private static final String schemaString1 = "{\"type\":\"record\"," + diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java b/external/storm-avro/src/test/java/org/apache/storm/avro/TestGenericAvroSerializer.java similarity index 98% rename from external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java rename to external/storm-avro/src/test/java/org/apache/storm/avro/TestGenericAvroSerializer.java index ddfdcf5c33d..bb44b08dc4a 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/avro/TestGenericAvroSerializer.java +++ b/external/storm-avro/src/test/java/org/apache/storm/avro/TestGenericAvroSerializer.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.hdfs.avro; +package org.apache.storm.avro; import org.apache.avro.Schema; import org.junit.Assert; diff --git a/external/storm-hdfs/src/test/resources/FixedAvroSerializer.config b/external/storm-avro/src/test/resources/FixedAvroSerializer.config similarity index 100% rename from external/storm-hdfs/src/test/resources/FixedAvroSerializer.config rename to external/storm-avro/src/test/resources/FixedAvroSerializer.config diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md index e6c07ef31ea..98666c33d01 100644 --- a/external/storm-hdfs/README.md +++ b/external/storm-hdfs/README.md @@ -353,7 +353,7 @@ method is provided for this: By default Storm will use the ```GenericAvroSerializer``` to handle serialization. This will work, but there are much faster options available if you can pre-define the schemas you will be using or utilize an external schema registry. An implementation using the Confluent Schema Registry is provided, but others can be implemented and provided to Storm. -Please see the javadoc for classes in org.apache.storm.hdfs.avro for information about using the built-in options or +Please see the javadoc for classes in org.apache.storm.avro for information about using the built-in options or creating your own. diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml index b5874d0a992..bee69074cfc 100644 --- a/external/storm-hdfs/pom.xml +++ b/external/storm-hdfs/pom.xml @@ -35,13 +35,6 @@ - - - confluent - http://packages.confluent.io/maven - - - org.apache.storm @@ -196,15 +189,9 @@ test - io.confluent - kafka-avro-serializer - 1.0 - - - org.slf4j - slf4j-log4j12 - - + org.apache.storm + storm-avro + ${project.version} diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md index 0a076294f5b..e8769828400 100644 --- a/external/storm-kafka/README.md +++ b/external/storm-kafka/README.md @@ -97,6 +97,7 @@ The KafkaConfig class also has bunch of public variables that controls your appl ``` Most of them are self explanatory except MultiScheme. + ###MultiScheme MultiScheme is an interface that dictates how the ByteBuffer consumed from Kafka gets transformed into a storm tuple. It also controls the naming of your output field. @@ -360,6 +361,44 @@ For Trident: StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build()); ``` +## Avro Integration for Storm-Kafka + +To integrate with Avro you **must** register the appropriate Kryo serializers with your topology configuration. A convenience method is provided for this: + +`AvroUtils.addAvroKryoSerializations(conf);` + +###AvroScheme + +`AvroScheme` is an implementation of `Scheme`. You can use AvroScheme to read Avro GenericRecord from Kafka: + +```java +BrokerHosts hosts = new ZkHosts(zkConnString); +String schema = "your json format schema here"; +SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString()); +spoutConfig.scheme = new SchemeAsMultiScheme(new AvroScheme(schema)); +KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); +``` + +###DirectAvroTupleToKafkaMapper + +`DirectAvroTupleToKafkaMapper` is an implementation of `TupleToKafkaMapper`. You can use DirectAvroTupleToKafkaMapper to write Avro GenericRecord to Kafka: + +```java + //set producer properties. + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); + props.put("acks", "1"); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); + KafkaBolt bolt = new KafkaBolt() + .withProducerProperties(props) + .withTopicSelector(new DefaultTopicSelector("test")) + .withTupleToKafkaMapper(new DirectAvroTupleToKafkaMapper()); +``` + + *NOTE*: To use this mapper you **must** use `ByteArraySerializer` for Kafka `value.serializer` property setting. + + ## Committer Sponsors * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org)) diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml index 79824ef2831..dcd12e76840 100644 --- a/external/storm-kafka/pom.xml +++ b/external/storm-kafka/pom.xml @@ -61,6 +61,11 @@ src/test + + org.apache.storm + storm-avro + ${project.version} + commons-io commons-io diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/AvroScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/AvroScheme.java new file mode 100644 index 00000000000..048f7cfcf30 --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/AvroScheme.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericContainer; +import org.apache.storm.Config; +import org.apache.storm.avro.DefaultDirectAvroSerializer; +import org.apache.storm.avro.DirectAvroSerializer; +import org.apache.storm.spout.Scheme; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; + +public class AvroScheme implements Scheme { + public static final String AVRO_SCHEME_KEY = "avro"; + DirectAvroSerializer serializer = new DefaultDirectAvroSerializer(); + Schema schema; + + /** + * @param schemaString json format schema string + */ + public AvroScheme(String schemaString) { + schema = new Schema.Parser().parse(schemaString); + } + + /** + * for confluent avro schema registry + * + * @param url schema registry server url + * @param id schema id + */ + public AvroScheme(String url, int id) { + getRegistrySchema(url, id); + } + + /** + * @param id schema id + * @param stormConf storm configuration + */ + public AvroScheme(int id, Map stormConf) { + String url = (String) stormConf.get(Config.TOPOLOGY_AVRO_CONFLUENT_SCHEMA_REGISTRY_URL); + getRegistrySchema(url, id); + } + + private void getRegistrySchema(String url, int id) { + CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(url, 10000); + try { + schema = client.getByID(id); + } catch (IOException | RestClientException e) { + throw new RuntimeException(e); + } + } + + @Override + public List deserialize(ByteBuffer byteBuffer) { + try { + GenericContainer record = null; + if(byteBuffer.hasArray()) { + record = serializer.deserialize(byteBuffer.array(), schema); + } else { + record = serializer.deserialize(Utils.toByteArray(byteBuffer), schema); + } + return new Values(record); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public Fields getOutputFields() { + return new Fields(AVRO_SCHEME_KEY); + } +} diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/DirectAvroTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/DirectAvroTupleToKafkaMapper.java new file mode 100644 index 00000000000..0015ee668ef --- /dev/null +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/DirectAvroTupleToKafkaMapper.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.bolt.mapper; + +import java.io.IOException; + +import org.apache.avro.generic.GenericContainer; +import org.apache.storm.avro.DefaultDirectAvroSerializer; +import org.apache.storm.avro.DirectAvroSerializer; +import org.apache.storm.tuple.Tuple; + +public class DirectAvroTupleToKafkaMapper extends FieldNameBasedTupleToKafkaMapper { + + DirectAvroSerializer serializer = new DefaultDirectAvroSerializer(); + + public DirectAvroTupleToKafkaMapper() { + super(); + } + + public DirectAvroTupleToKafkaMapper(String boltKeyField, String boltMessageField) { + super(boltKeyField, boltMessageField); + } + + @Override + public byte[] getMessageFromTuple(Tuple tuple) { + Object obj = tuple.getValueByField(boltMessageField); + try { + return obj == null ? null : serializer.serialize((GenericContainer)obj); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/TestAvroScheme.java b/external/storm-kafka/src/test/org/apache/storm/kafka/TestAvroScheme.java new file mode 100644 index 00000000000..2784511097d --- /dev/null +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TestAvroScheme.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericContainer; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.storm.avro.DefaultDirectAvroSerializer; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class TestAvroScheme { + private static final String schemaString1 = "{\"type\":\"record\"," + + "\"name\":\"stormtest1\"," + + "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," + + "{ \"name\":\"int1\", \"type\":\"int\" }]}"; + + AvroScheme avroSchema = new AvroScheme(schemaString1); + + @Test + public void testAvroSchema() { + GenericRecord record = new GenericData.Record(new Schema.Parser().parse(schemaString1)); + record.put("foo1", "xin"); + record.put("int1", 2016); + ByteBuffer byteBuffer = null; + try { + byteBuffer = ByteBuffer.wrap(new DefaultDirectAvroSerializer().serialize(record)); + } catch (IOException e) { + e.printStackTrace(); + } + Assert.assertEquals(record, (GenericContainer)avroSchema.deserialize(byteBuffer).get(0)); + } + +} diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/TestDirectAvroTupleToKafkaMapper.java b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/TestDirectAvroTupleToKafkaMapper.java new file mode 100644 index 00000000000..92aa5d53bf9 --- /dev/null +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/bolt/TestDirectAvroTupleToKafkaMapper.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka.bolt; + +import java.io.IOException; +import java.util.HashMap; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.storm.Config; +import org.apache.storm.avro.DefaultDirectAvroSerializer; +import org.apache.storm.kafka.bolt.mapper.DirectAvroTupleToKafkaMapper; +import org.apache.storm.task.GeneralTopologyContext; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.junit.Assert; +import org.junit.Test; + +public class TestDirectAvroTupleToKafkaMapper { + private static final String schemaString1 = "{\"type\":\"record\"," + + "\"name\":\"stormtest1\"," + + "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," + + "{ \"name\":\"int1\", \"type\":\"int\" }]}"; + + @Test + public void testMapper() { + GenericRecord record = new GenericData.Record(new Schema.Parser().parse(schemaString1)); + record.put("foo1", "xin"); + record.put("int1", 2016); + String key = "my_key"; + + DirectAvroTupleToKafkaMapper mapper = new DirectAvroTupleToKafkaMapper("bolt-key", "bolt-msg"); + Tuple tuple = generateTestTuple(key, record); + + Object _key = mapper.getKeyFromTuple(tuple); + byte[] _message = mapper.getMessageFromTuple(tuple); + + byte[] bytes = null; + try { + bytes = new DefaultDirectAvroSerializer().serialize(record); + } catch (IOException e) { + e.printStackTrace(); + } + + Assert.assertEquals(key, _key); + Assert.assertArrayEquals(bytes, _message); + } + + private Tuple generateTestTuple(Object key, Object msg) { + TopologyBuilder builder = new TopologyBuilder(); + GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(), + new Config(), new HashMap(), new HashMap(), new HashMap(), "") { + @Override + public Fields getComponentOutputFields(String componentId, String streamId) { + return new Fields("bolt-key", "bolt-msg"); + } + }; + return new TupleImpl(topologyContext, new Values(key, msg), 1, ""); + } +} diff --git a/pom.xml b/pom.xml index e8e7bb7e768..6cd5d5ee13d 100644 --- a/pom.xml +++ b/pom.xml @@ -276,6 +276,7 @@ storm-buildtools/storm-maven-plugins storm-core storm-rename-hack + external/storm-avro external/storm-kafka external/storm-hdfs external/storm-hbase diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index b725f875754..372383f296d 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -1725,6 +1725,18 @@ public class Config extends HashMap { @isBoolean public static final String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS= "topology.skip.missing.kryo.registrations"; + /** + * Class that specifies the avro serializer. + */ + @isString + public static final String TOPOLOGY_AVRO_SERIALIZER = "topology.avro.serializer"; + + /** + * The url that specifies the topology avro schema registry server. + */ + @isString + public static final String TOPOLOGY_AVRO_CONFLUENT_SCHEMA_REGISTRY_URL = "topology.avro.confluent.schema.registry.url"; + /** * A list of classes implementing IMetricsConsumer (See storm.yaml.example for exact config format). * Each listed class will be routed all the metrics data generated by the storm metrics API. diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml index a8c6d412981..672240257e1 100644 --- a/storm-dist/binary/src/main/assembly/binary.xml +++ b/storm-dist/binary/src/main/assembly/binary.xml @@ -69,6 +69,20 @@ bin + + ${project.basedir}/../../external/storm-avro/target + external/storm-avro + + storm*jar + + + + ${project.basedir}/../../external/storm-avro + external/storm-avro + + README.* + + ${project.basedir}/../../external/storm-kafka/target external/storm-kafka