diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java index e420cb35b93..20f30a4948c 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java @@ -18,14 +18,25 @@ package org.apache.storm.kafka.trident; import com.google.common.base.Objects; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + import org.apache.storm.kafka.Broker; import org.apache.storm.kafka.Partition; +import org.json.simple.JSONAware; import java.io.Serializable; -import java.util.*; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; -public class GlobalPartitionInformation implements Iterable, Serializable { +public class GlobalPartitionInformation implements Iterable, Serializable, JSONAware { + private static final ObjectMapper json = new ObjectMapper(); + private Map partitionMap; public String topic; @@ -113,4 +124,13 @@ public boolean equals(Object obj) { final GlobalPartitionInformation other = (GlobalPartitionInformation) obj; return Objects.equal(this.partitionMap, other.partitionMap); } + + @Override + public String toJSONString() { + try { + return json.writeValueAsString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } } diff --git a/storm-core/pom.xml b/storm-core/pom.xml index b6424d70679..5ebfb37a7b9 100644 --- a/storm-core/pom.xml +++ b/storm-core/pom.xml @@ -333,6 +333,10 @@ com.fasterxml.jackson.dataformat jackson-dataformat-smile + + com.fasterxml.jackson.core + jackson-databind + org.apache.curator curator-client