diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml index 21738a554cd..2c8300b6379 100755 --- a/external/storm-eventhubs/pom.xml +++ b/external/storm-eventhubs/pom.xml @@ -26,7 +26,6 @@ storm-eventhubs - 2.0.0-SNAPSHOT jar storm-eventhubs EventHubs Storm Spout @@ -94,6 +93,11 @@ eventhubs-client ${eventhubs.client.version} + + com.microsoft.azure + azure-eventhubs + 0.10.0 + org.apache.storm storm-core diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java index 3d64cc5624e..d3fc36ab4b8 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java @@ -22,10 +22,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.microsoft.eventhubs.client.EventHubClient; +import com.microsoft.azure.eventhubs.*; import com.microsoft.eventhubs.client.EventHubException; -import com.microsoft.eventhubs.client.EventHubSender; - import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -41,7 +39,8 @@ public class EventHubBolt extends BaseRichBolt { .getLogger(EventHubBolt.class); protected OutputCollector collector; - protected EventHubSender sender; + protected PartitionSender sender=null; + protected EventHubClient ehClient=null; protected EventHubBoltConfig boltConfig; public EventHubBolt(String connectionString, String entityPath) { @@ -70,10 +69,9 @@ public void prepare(Map config, TopologyContext context, logger.info("creating sender: " + boltConfig.getConnectionString() + ", " + boltConfig.getEntityPath() + ", " + myPartitionId); try { - EventHubClient eventHubClient = EventHubClient.create( - boltConfig.getConnectionString(), - boltConfig.getEntityPath()); - sender = eventHubClient.createPartitionSender(myPartitionId); + ehClient = EventHubClient.createFromConnectionString(boltConfig.getConnectionString()).get(); + if (boltConfig.getPartitionMode()) + sender = ehClient.createPartitionSender(Integer.toString(context.getThisTaskIndex())).get(); } catch (Exception ex) { collector.reportError(ex); throw new RuntimeException(ex); @@ -84,12 +82,39 @@ public void prepare(Map config, TopologyContext context, @Override public void execute(Tuple tuple) { try { - sender.send(boltConfig.getEventDataFormat().serialize(tuple)); + EventData sendEvent = new EventData(boltConfig.getEventDataFormat().serialize(tuple)); + if(boltConfig.getPartitionMode() && sender!=null) + sender.send(sendEvent).get(); + else if(boltConfig.getPartitionMode() && sender==null) + throw new EventHubException("Sender is null"); + else if(!boltConfig.getPartitionMode() && ehClient!=null) + ehClient.send(sendEvent).get(); + else if(!boltConfig.getPartitionMode() && ehClient==null) + throw new EventHubException("ehclient is null"); collector.ack(tuple); } catch (EventHubException ex) { collector.reportError(ex); collector.fail(tuple); } + catch (Exception e){ + + } + } + + @Override + public void cleanup() { + try{ + if(sender!=null) { + sender.close().get(); + sender = null; + } + if(ehClient!=null){ + ehClient.close().get(); + ehClient = null; + } + }catch (Exception e){ + logger.error("Exception occured during close phase"+e.toString()); + } } @Override diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java index 10b4e393e7c..41a2c3691e7 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java @@ -20,7 +20,7 @@ import java.io.Serializable; import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; -import com.microsoft.eventhubs.client.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.ConnectionStringBuilder; /* * EventHubs bolt configurations @@ -81,8 +81,8 @@ public EventHubBoltConfig(String userName, String password, String namespace, public EventHubBoltConfig(String userName, String password, String namespace, String targetFqnAddress, String entityPath, boolean partitionMode, IEventDataFormat dataFormat) { - this.connectionString = new ConnectionStringBuilder(userName, password, - namespace, targetFqnAddress).getConnectionString(); + this.connectionString = new ConnectionStringBuilder(namespace,entityPath, + userName,password).toString(); this.entityPath = entityPath; this.partitionMode = partitionMode; this.dataFormat = dataFormat; diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java index 5f9acbd344c..c3be056c880 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java @@ -17,7 +17,11 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.PartitionReceiver; import org.apache.qpid.amqp_1_0.client.Message; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.messaging.Data; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +34,10 @@ import com.microsoft.eventhubs.client.IEventHubFilter; import com.microsoft.eventhubs.client.ResilientEventHubReceiver; +import java.nio.charset.Charset; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.qpid.amqp_1_0.type.Section; @@ -39,16 +46,14 @@ public class EventHubReceiverImpl implements IEventHubReceiver { private static final Logger logger = LoggerFactory.getLogger(EventHubReceiverImpl.class); - private static final Symbol OffsetKey = Symbol.valueOf(Constants.OffsetKey); - private static final Symbol SequenceNumberKey = Symbol.valueOf(Constants.SequenceNumberKey); private final String connectionString; private final String entityName; private final String partitionId; - private final int defaultCredits; private final String consumerGroupName; - private ResilientEventHubReceiver receiver; + private PartitionReceiver receiver; + private EventHubClient ehClient=null; private ReducedMetric receiveApiLatencyMean; private CountMetric receiveApiCallCount; private CountMetric receiveMessageCount; @@ -56,7 +61,6 @@ public class EventHubReceiverImpl implements IEventHubReceiver { public EventHubReceiverImpl(EventHubSpoutConfig config, String partitionId) { this.connectionString = config.getConnectionString(); this.entityName = config.getEntityPath(); - this.defaultCredits = config.getReceiverCredits(); this.partitionId = partitionId; this.consumerGroupName = config.getConsumerGroupName(); receiveApiLatencyMean = new ReducedMetric(new MeanReducer()); @@ -65,24 +69,38 @@ public EventHubReceiverImpl(EventHubSpoutConfig config, String partitionId) { } @Override - public void open(IEventHubFilter filter) throws EventHubException { - logger.info("creating eventhub receiver: partitionId=" + partitionId + - ", filterString=" + filter.getFilterString()); + public void open(String offset) throws EventHubException { + logger.info("creating eventhub receiver: partitionId=" + partitionId + + ", offset=" + offset); long start = System.currentTimeMillis(); - receiver = new ResilientEventHubReceiver(connectionString, entityName, - partitionId, consumerGroupName, defaultCredits, filter); - receiver.initialize(); - + try { + ehClient = EventHubClient.createFromConnectionString(connectionString).get(); + receiver = ehClient.createEpochReceiver( + consumerGroupName, + partitionId, + offset, + false, + 1).get(); + }catch (Exception e){ + logger.info("Exception in creating EventhubClient"+e.toString()); + } long end = System.currentTimeMillis(); logger.info("created eventhub receiver, time taken(ms): " + (end-start)); } @Override - public void close() { + public void close(){ if(receiver != null) { - receiver.close(); + try { + receiver.close().get(); + if(ehClient!=null) + ehClient.close().get(); + }catch (Exception e){ + logger.error("Exception occured during close phase"+e.toString()); + } logger.info("closed eventhub receiver: partitionId=" + partitionId ); receiver = null; + ehClient = null; } } @@ -92,50 +110,34 @@ public boolean isOpen() { } @Override - public EventData receive(long timeoutInMilliseconds) { + public EventData receive() { long start = System.currentTimeMillis(); - Message message = receiver.receive(timeoutInMilliseconds); + Iterable receivedEvents=null; + /*Get one message at a time for backward compatibility behaviour*/ + try { + receivedEvents = receiver.receive(1).get(); + }catch (Exception e){ + logger.error("Exception occured during receive"+e.toString()); + } long end = System.currentTimeMillis(); long millis = (end - start); receiveApiLatencyMean.update(millis); receiveApiCallCount.incr(); - - if (message == null) { - //Temporary workaround for AMQP/EH bug of failing to receive messages - /*if(timeoutInMilliseconds > 100 && millis < timeoutInMilliseconds/2) { - throw new RuntimeException( - "Restart EventHubSpout due to failure of receiving messages in " - + millis + " millisecond"); - }*/ + if (receivedEvents == null) { return null; } - receiveMessageCount.incr(); - - MessageId messageId = createMessageId(message); - return EventData.create(message, messageId); - } - - private MessageId createMessageId(Message message) { - String offset = null; - long sequenceNumber = 0; - - for (Section section : message.getPayload()) { - if (section instanceof MessageAnnotations) { - MessageAnnotations annotations = (MessageAnnotations) section; - HashMap annonationMap = (HashMap) annotations.getValue(); - - if (annonationMap.containsKey(OffsetKey)) { - offset = (String) annonationMap.get(OffsetKey); - } - - if (annonationMap.containsKey(SequenceNumberKey)) { - sequenceNumber = (Long) annonationMap.get(SequenceNumberKey); - } - } + MessageId messageId=null; + Message message=null; + for (com.microsoft.azure.eventhubs.EventData receivedEvent : receivedEvents) { + messageId = new MessageId(partitionId, + receivedEvent.getSystemProperties().getOffset(), + receivedEvent.getSystemProperties().getSequenceNumber()); + List
body = new ArrayList
(); + body.add(new Data(new Binary((new String(receivedEvent.getBody(), Charset.defaultCharset())).getBytes()))); + message = new Message(body); } - - return MessageId.create(partitionId, offset, sequenceNumber); + return org.apache.storm.eventhubs.spout.EventData.create(message, messageId); } @Override diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java index e06953aa05b..3bbfa525033 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java @@ -20,7 +20,8 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import com.microsoft.eventhubs.client.ConnectionStringBuilder; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.servicebus.ConnectionStringBuilder; public class EventHubSpoutConfig implements Serializable { private static final long serialVersionUID = 1L; @@ -42,8 +43,7 @@ public class EventHubSpoutConfig implements Serializable { private String connectionString; private String topologyName; private IEventDataScheme scheme = new StringEventDataScheme(); - private String consumerGroupName = null; // if null then use default - // consumer group + private String consumerGroupName = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME; private String outputStreamId; @@ -52,8 +52,8 @@ public EventHubSpoutConfig(String username, String password, String namespace, String entityPath, int partitionCount) { this.userName = username; this.password = password; - this.connectionString = new ConnectionStringBuilder(username, password, - namespace).getConnectionString(); + this.connectionString = new ConnectionStringBuilder(namespace,entityPath, + username,password).toString(); this.namespace = namespace; this.entityPath = entityPath; this.partitionCount = partitionCount; @@ -232,9 +232,12 @@ public String getConnectionString() { return connectionString; } + /*Keeping it for backward compatibility*/ public void setTargetAddress(String targetFqnAddress) { - this.connectionString = new ConnectionStringBuilder(userName, password, - namespace, targetFqnAddress).getConnectionString(); + } + + public void setTargetAddress(){ + } public EventHubSpoutConfig withTargetAddress(String targetFqnAddress) { diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java index bc2db14101f..0ee024c7dfb 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java @@ -24,13 +24,13 @@ public interface IEventHubReceiver { - void open(IEventHubFilter filter) throws EventHubException; + void open(String offset) throws EventHubException; void close(); - + boolean isOpen(); - EventData receive(long timeoutInMilliseconds); - + EventData receive(); + Map getMetricsData(); } diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java index 105474209e5..0ce52ea9b26 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java @@ -53,7 +53,7 @@ public EventData receive() { EventData eventData; if (toResend.isEmpty()) { - eventData = receiver.receive(ehReceiveTimeoutMs); + eventData = receiver.receive(); } else { eventData = toResend.pollFirst(); } diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java index b66a7850818..68bad3c2046 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java @@ -65,16 +65,13 @@ public void open() throws Exception { offset = Constants.DefaultStartingOffset; } - IEventHubFilter filter; if (offset.equals(Constants.DefaultStartingOffset) && config.getEnqueueTimeFilter() != 0) { - filter = new EventHubEnqueueTimeFilter(config.getEnqueueTimeFilter()); - } - else { - filter = new EventHubOffsetFilter(offset); + offset = Long.toString(config.getEnqueueTimeFilter()); } - receiver.open(filter); + + receiver.open(offset); } @Override @@ -99,7 +96,7 @@ protected String getCompletedOffset() { @Override public EventData receive() { - EventData eventData = receiver.receive(5000); + EventData eventData = receiver.receive(); if (eventData != null) { lastOffset = eventData.getMessageId().getOffset(); } diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java index 159fe418c9d..469b3dc514e 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java @@ -49,10 +49,10 @@ public boolean open(String offset) { try { if((offset == null || offset.equals(Constants.DefaultStartingOffset)) && spoutConfig.getEnqueueTimeFilter() != 0) { - receiver.open(new EventHubEnqueueTimeFilter(spoutConfig.getEnqueueTimeFilter())); + receiver.open(offset); } else { - receiver.open(new EventHubOffsetFilter(offset)); + receiver.open(offset); } lastOffset = offset; return true; @@ -81,7 +81,7 @@ public List receiveBatch(String offset, int count) { } for(int i=0; i