diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java index 9ef7f741804..161291fda48 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java @@ -17,12 +17,17 @@ */ package org.apache.storm.kafka; +import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import com.google.common.collect.ImmutableMap; import java.nio.ByteBuffer; import java.util.List; +import static org.apache.storm.kafka.StringScheme.deserializeString; +import static org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper.BOLT_KEY; +import static org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE; + public class StringKeyValueScheme extends StringScheme implements KeyValueScheme { @Override @@ -30,9 +35,12 @@ public List deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) { if ( key == null ) { return deserialize(value); } - String keyString = StringScheme.deserializeString(key); - String valueString = StringScheme.deserializeString(value); - return new Values(ImmutableMap.of(keyString, valueString)); + return new Values(ImmutableMap.of(deserializeString(key), deserializeString(value))); + } + + @Override + public Fields getOutputFields() { + return new Fields(BOLT_KEY, BOLT_MESSAGE); } } diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java index 7e5ff00077d..bc5537a7de3 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/StringKeyValueSchemeTest.java @@ -17,7 +17,6 @@ */ package org.apache.storm.kafka; -import org.apache.storm.tuple.Fields; import com.google.common.collect.ImmutableMap; import org.junit.Test; @@ -25,8 +24,10 @@ import java.nio.charset.Charset; import java.util.Collections; +import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper.BOLT_KEY; +import static org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE; public class StringKeyValueSchemeTest { @@ -39,9 +40,7 @@ public void testDeserialize() throws Exception { @Test public void testGetOutputFields() throws Exception { - Fields outputFields = scheme.getOutputFields(); - assertTrue(outputFields.contains(StringScheme.STRING_SCHEME_KEY)); - assertEquals(1, outputFields.size()); + assertEquals(asList(BOLT_KEY, BOLT_MESSAGE), scheme.getOutputFields().toList()); } @Test