Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions external/storm-avro/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#Storm Avro

Storm integration for [Apache Avro](http://avro.apache.org/).

## GenericAvroSerializer & FixedAvroSerializer & ConfluentAvroSerializer
These Serializers are the implementations of `AbstractAvroSerializer`.
To serialize Avro GenericRecord between worker and another worker you **must** register the appropriate Kryo serializers with your topology configuration. A convenience
method is provided for this:

`AvroUtils.addAvroKryoSerializations(conf);`

By default Storm will use the `GenericAvroSerializer` to handle serialization. This will work, but there are much faster options available if you can pre-define the schemas you will be using or utilize an external schema registry. An implementation using the Confluent Schema Registry is provided, but others can be implemented and provided to Storm.
The configuration property is `Config.TOPOLOGY_AVRO_SERIALIZER`.

Please see the javadoc for classes in org.apache.storm.avro for information about using the built-in options or creating your own.


### AvroSchemaRegistry

```java
public interface AvroSchemaRegistry extends Serializable {
String getFingerprint(Schema schema);

Schema getSchema(String fingerPrint);
}
```

### AbstractAvroSerializer

```java
public abstract class AbstractAvroSerializer extends Serializer<GenericContainer> implements AvroSchemaRegistry {

@Override
public void write(Kryo kryo, Output output, GenericContainer record) { }

@Override
public GenericContainer read(Kryo kryo, Input input, Class<GenericContainer> aClass) { }
}
```

## DirectAvroSerializer

`DirectAvroSerializer` provide the ability to serialize Avro `GenericContainer` directly.

```java
public interface DirectAvroSerializer extends Serializable {

public byte[] serialize(GenericContainer record) throws IOException;

public GenericContainer deserialize(byte[] bytes, Schema schema) throws IOException;

}
```

## License

Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

## Committer Sponsors

* Aaron Niskode-Dossett ([dossett@gmail.com](mailto:dossett@gmail.com))

97 changes: 97 additions & 0 deletions external/storm-avro/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
<version>2.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>storm-avro</artifactId>

<developers>
<developer>
<id>dossett</id>
<name>Aaron Niskode-Dossett</name>
<email>dossett@gmail.com</email>
</developer>
<developer>
<id>vesense</id>
<name>Xin Wang</name>
<email>data.xinwang@gmail.com</email>
</developer>
</developers>

<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven</url>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>1.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<!--test dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.hdfs.avro;
package org.apache.storm.avro;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.hdfs.avro;
package org.apache.storm.avro;

import org.apache.avro.Schema;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.hdfs.avro;
package org.apache.storm.avro;

import org.apache.avro.generic.GenericData;
import org.apache.storm.Config;
Expand All @@ -25,15 +25,15 @@ public class AvroUtils {
* A helper method to extract avro serialization configurations from the topology configuration and register
* specific kryo serializers as necessary. A default serializer will be provided if none is specified in the
* configuration. "avro.serializer" should specify the complete class name of the serializer, e.g.
* "org.apache.stgorm.hdfs.avro.GenericAvroSerializer"
* "org.apache.stgorm.avro.GenericAvroSerializer"
*
* @param conf The topology configuration
* @throws ClassNotFoundException If the specified serializer cannot be located.
*/
public static void addAvroKryoSerializations(Config conf) throws ClassNotFoundException {
final Class serializerClass;
if (conf.containsKey("avro.serializer")) {
serializerClass = Class.forName((String)conf.get("avro.serializer"));
if (conf.containsKey(Config.TOPOLOGY_AVRO_SERIALIZER)) {
serializerClass = Class.forName((String)conf.get(Config.TOPOLOGY_AVRO_SERIALIZER));
}
else {
serializerClass = GenericAvroSerializer.class;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.hdfs.avro;
package org.apache.storm.avro;

import com.esotericsoftware.kryo.Kryo;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;

import org.apache.avro.Schema;
import org.apache.storm.Config;

import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -50,11 +53,11 @@ public class ConfluentAvroSerializer extends AbstractAvroSerializer {
* See Storm's SerializationFactory class for details
*
* @param k Unused but needs to be present for Serialization Factory to find this constructor
* @param stormConf The global storm configuration. Must define "avro.schemaregistry.confluent" to locate the
* @param stormConf The global storm configuration. Must define "topology.avro.confluent.schema.registry.url" to locate the
* confluent schema registry. Should in the form of "http://HOST:PORT"
*/
public ConfluentAvroSerializer(Kryo k, Map stormConf) {
url = (String) stormConf.get("avro.schemaregistry.confluent");
url = (String) stormConf.get(Config.TOPOLOGY_AVRO_CONFLUENT_SCHEMA_REGISTRY_URL);
this.theClient = new CachedSchemaRegistryClient(this.url, 10000);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.avro;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;

public class DefaultDirectAvroSerializer implements DirectAvroSerializer {

@Override
public byte[] serialize(GenericContainer record) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<GenericContainer> writer = new GenericDatumWriter<GenericContainer>(
record.getSchema());
Encoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
writer.write(record, encoder);
encoder.flush();
byte[] bytes = out.toByteArray();
out.close();
return bytes;
}

@Override
public GenericContainer deserialize(byte[] bytes, Schema schema)
throws IOException {
DatumReader<GenericContainer> reader = new GenericDatumReader<GenericContainer>(
schema);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
GenericContainer record = reader.read(null, decoder);
return record;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.avro;

import java.io.IOException;
import java.io.Serializable;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericContainer;

public interface DirectAvroSerializer extends Serializable {

public byte[] serialize(GenericContainer record) throws IOException;

public GenericContainer deserialize(byte[] bytes, Schema schema) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.hdfs.avro;
package org.apache.storm.avro;

import org.apache.avro.Schema;
import org.apache.avro.SchemaNormalization;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.hdfs.avro;
package org.apache.storm.avro;

import org.apache.avro.Schema;

Expand Down
Loading