From e3545aa692673bd379d88d5009c2b8662a9817ec Mon Sep 17 00:00:00 2001 From: Ranjan Banerjee Date: Thu, 2 Mar 2017 19:28:49 -0800 Subject: [PATCH 1/2] STORM-2371: New eventhub implementation --- external/storm-eventhubs/pom.xml | 5 + .../storm/eventhubs/bolt/EventHubBolt.java | 58 +++++++-- .../eventhubs/bolt/EventHubBoltConfig.java | 6 +- .../spout/BinaryEventDataScheme.java | 19 +-- .../eventhubs/spout/EventDataScheme.java | 28 +---- .../{EventData.java => EventDataWrap.java} | 96 +++++++-------- .../eventhubs/spout/EventHubReceiverImpl.java | 110 +++++++++--------- .../storm/eventhubs/spout/EventHubSpout.java | 17 ++- .../eventhubs/spout/EventHubSpoutConfig.java | 17 +-- .../eventhubs/spout/IEventDataScheme.java | 6 +- .../eventhubs/spout/IEventHubReceiver.java | 9 +- .../eventhubs/spout/IPartitionManager.java | 2 +- .../eventhubs/spout/PartitionManager.java | 28 ++--- .../spout/SimplePartitionManager.java | 21 ++-- .../spout/StringEventDataScheme.java | 27 +---- .../trident/ITridentPartitionManager.java | 6 +- .../TransactionalTridentEventHubEmitter.java | 14 +-- .../trident/TridentPartitionManager.java | 14 +-- .../eventhubs/spout/EventHubReceiverMock.java | 30 ++--- .../spout/PartitionManagerCallerMock.java | 6 +- .../storm/eventhubs/spout/TestEventData.java | 4 +- .../eventhubs/spout/TestEventHubSpout.java | 2 +- 22 files changed, 250 insertions(+), 275 deletions(-) rename external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/{EventData.java => EventDataWrap.java} (72%) mode change 100755 => 100644 diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml index 21738a554cd..a4aecfba2b0 100755 --- a/external/storm-eventhubs/pom.xml +++ b/external/storm-eventhubs/pom.xml @@ -94,6 +94,11 @@ eventhubs-client ${eventhubs.client.version} + + com.microsoft.azure + azure-eventhubs + 0.11.0 + org.apache.storm storm-core diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java index 3d64cc5624e..79ce0b1e6cc 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java @@ -18,14 +18,16 @@ package org.apache.storm.eventhubs.bolt; import java.util.Map; +import java.util.concurrent.ExecutionException; +import com.microsoft.azure.servicebus.ServiceBusException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.microsoft.eventhubs.client.EventHubClient; +import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventhubs.PartitionSender; +import com.microsoft.azure.eventhubs.EventHubClient; import com.microsoft.eventhubs.client.EventHubException; -import com.microsoft.eventhubs.client.EventHubSender; - import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -41,7 +43,8 @@ public class EventHubBolt extends BaseRichBolt { .getLogger(EventHubBolt.class); protected OutputCollector collector; - protected EventHubSender sender; + protected PartitionSender sender=null; + protected EventHubClient ehClient=null; protected EventHubBoltConfig boltConfig; public EventHubBolt(String connectionString, String entityPath) { @@ -70,10 +73,9 @@ public void prepare(Map config, TopologyContext context, logger.info("creating sender: " + boltConfig.getConnectionString() + ", " + boltConfig.getEntityPath() + ", " + myPartitionId); try { - EventHubClient eventHubClient = EventHubClient.create( - boltConfig.getConnectionString(), - boltConfig.getEntityPath()); - sender = eventHubClient.createPartitionSender(myPartitionId); + ehClient = EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString()); + if (boltConfig.getPartitionMode()) + sender = ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex())); } catch (Exception ex) { collector.reportError(ex); throw new RuntimeException(ex); @@ -84,11 +86,47 @@ public void prepare(Map config, TopologyContext context, @Override public void execute(Tuple tuple) { try { - sender.send(boltConfig.getEventDataFormat().serialize(tuple)); + EventData sendEvent = new EventData(boltConfig.getEventDataFormat().serialize(tuple)); + if(boltConfig.getPartitionMode() && sender!=null) + sender.sendSync(sendEvent); + else if(boltConfig.getPartitionMode() && sender==null) + throw new EventHubException("Sender is null"); + else if(!boltConfig.getPartitionMode() && ehClient!=null) + ehClient.sendSync(sendEvent); + else if(!boltConfig.getPartitionMode() && ehClient==null) + throw new EventHubException("ehclient is null"); collector.ack(tuple); - } catch (EventHubException ex) { + } catch (EventHubException ex ) { collector.reportError(ex); collector.fail(tuple); + }catch (ServiceBusException e){ + collector.reportError(e); + collector.fail(tuple); + } + } + + @Override + public void cleanup() { + if(sender != null) { + try { + sender.close().whenComplete((voidargs,error)->{ + try{ + if(error!=null){ + logger.error("Exception during sender cleanup phase"+error.toString()); + } + ehClient.closeSync(); + }catch (Exception e){ + logger.error("Exception during ehclient cleanup phase"+e.toString()); + } + }).get(); + }catch (InterruptedException e){ + logger.error("Exception occured during cleanup phase"+e.toString()); + }catch (ExecutionException e){ + logger.error("Exception occured during cleanup phase"+e.toString()); + } + logger.info("Eventhub Bolt cleaned up"); + sender = null; + ehClient = null; } } diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java index 10b4e393e7c..41a2c3691e7 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java @@ -20,7 +20,7 @@ import java.io.Serializable; import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; -import com.microsoft.eventhubs.client.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.ConnectionStringBuilder; /* * EventHubs bolt configurations @@ -81,8 +81,8 @@ public EventHubBoltConfig(String userName, String password, String namespace, public EventHubBoltConfig(String userName, String password, String namespace, String targetFqnAddress, String entityPath, boolean partitionMode, IEventDataFormat dataFormat) { - this.connectionString = new ConnectionStringBuilder(userName, password, - namespace, targetFqnAddress).getConnectionString(); + this.connectionString = new ConnectionStringBuilder(namespace,entityPath, + userName,password).toString(); this.entityPath = entityPath; this.partitionMode = partitionMode; this.dataFormat = dataFormat; diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java index 7b0d7e54845..958597ed7c7 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java @@ -17,6 +17,7 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; +import com.microsoft.azure.eventhubs.EventData; import org.apache.qpid.amqp_1_0.client.Message; import org.apache.qpid.amqp_1_0.type.Section; import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; @@ -38,22 +39,10 @@ public class BinaryEventDataScheme implements IEventDataScheme { @Override - public List deserialize(Message message) { + public List deserialize(EventData eventData) { final List fieldContents = new ArrayList(); - - Map metaDataMap = new HashMap(); - byte[] messageData = new byte[0]; - - for (Section section : message.getPayload()) { - if (section instanceof Data) { - Data data = (Data) section; - messageData = data.getValue().getArray(); - } else if (section instanceof ApplicationProperties) { - final ApplicationProperties applicationProperties = (ApplicationProperties) section; - metaDataMap = applicationProperties.getValue(); - } - } - + byte[] messageData = eventData.getBody(); + Map metaDataMap = eventData.getProperties(); fieldContents.add(messageData); fieldContents.add(metaDataMap); return fieldContents; diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java index 90cad0a899e..5e67b9e2fd6 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java @@ -22,12 +22,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; - -import org.apache.qpid.amqp_1_0.client.Message; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; -import org.apache.qpid.amqp_1_0.type.messaging.Data; +import java.nio.charset.Charset; +import com.microsoft.azure.eventhubs.EventData; /** * An Event Data Scheme which deserializes message payload into the Strings. No @@ -46,25 +42,11 @@ public class EventDataScheme implements IEventDataScheme { private static final long serialVersionUID = 1L; @Override - public List deserialize(Message message) { + public List deserialize(EventData eventData) { final List fieldContents = new ArrayList(); - - Map metaDataMap = new HashMap(); String messageData = ""; - - for (Section section : message.getPayload()) { - if (section instanceof Data) { - Data data = (Data) section; - messageData = new String(data.getValue().getArray()); - } else if (section instanceof AmqpValue) { - AmqpValue amqpValue = (AmqpValue) section; - messageData = amqpValue.getValue().toString(); - } else if (section instanceof ApplicationProperties) { - final ApplicationProperties applicationProperties = (ApplicationProperties) section; - metaDataMap = applicationProperties.getValue(); - } - } - + messageData = new String (eventData.getBody(),eventData.getBodyOffset(),eventData.getBodyLength(),Charset.defaultCharset()); + Map metaDataMap = eventData.getProperties(); fieldContents.add(messageData); fieldContents.add(metaDataMap); return fieldContents; diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java old mode 100755 new mode 100644 similarity index 72% rename from external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java rename to external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java index e5834b4f561..5eeb4d26956 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java @@ -1,48 +1,48 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.spout; - -import org.apache.qpid.amqp_1_0.client.Message; - -public class EventData implements Comparable { - private final Message message; - private final MessageId messageId; - - public EventData(Message message, MessageId messageId) { - this.message = message; - this.messageId = messageId; - } - - public static EventData create(Message message, MessageId messageId) { - return new EventData(message, messageId); - } - - public Message getMessage() { - return this.message; - } - - public MessageId getMessageId() { - return this.messageId; - } - - @Override - public int compareTo(EventData ed) { - return messageId.getSequenceNumber(). - compareTo(ed.getMessageId().getSequenceNumber()); - } -} +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.storm.eventhubs.spout; + +import com.microsoft.azure.eventhubs.EventData; + +public class EventDataWrap implements Comparable { + private final EventData eventData; + private final MessageId messageId; + + public EventDataWrap(EventData eventdata, MessageId messageId) { + this.eventData = eventdata; + this.messageId = messageId; + } + + public static EventDataWrap create(EventData eventData, MessageId messageId) { + return new EventDataWrap(eventData, messageId); + } + + public EventData getEventData() { + return this.eventData; + } + + public MessageId getMessageId() { + return this.messageId; + } + + @Override + public int compareTo(EventDataWrap ed) { + return messageId.getSequenceNumber(). + compareTo(ed.getMessageId().getSequenceNumber()); + } +} diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java index 5f9acbd344c..56c225fdbb4 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java @@ -17,7 +17,8 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; -import org.apache.qpid.amqp_1_0.client.Message; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.PartitionReceiver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,28 +28,23 @@ import com.microsoft.eventhubs.client.Constants; import com.microsoft.eventhubs.client.EventHubException; -import com.microsoft.eventhubs.client.IEventHubFilter; -import com.microsoft.eventhubs.client.ResilientEventHubReceiver; +import com.microsoft.azure.eventhubs.EventData; +import java.util.concurrent.ExecutionException; import java.util.HashMap; +import java.util.List; import java.util.Map; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations; - public class EventHubReceiverImpl implements IEventHubReceiver { private static final Logger logger = LoggerFactory.getLogger(EventHubReceiverImpl.class); - private static final Symbol OffsetKey = Symbol.valueOf(Constants.OffsetKey); - private static final Symbol SequenceNumberKey = Symbol.valueOf(Constants.SequenceNumberKey); private final String connectionString; private final String entityName; private final String partitionId; - private final int defaultCredits; private final String consumerGroupName; - private ResilientEventHubReceiver receiver; + private PartitionReceiver receiver; + private EventHubClient ehClient=null; private ReducedMetric receiveApiLatencyMean; private CountMetric receiveApiCallCount; private CountMetric receiveMessageCount; @@ -56,7 +52,6 @@ public class EventHubReceiverImpl implements IEventHubReceiver { public EventHubReceiverImpl(EventHubSpoutConfig config, String partitionId) { this.connectionString = config.getConnectionString(); this.entityName = config.getEntityPath(); - this.defaultCredits = config.getReceiverCredits(); this.partitionId = partitionId; this.consumerGroupName = config.getConsumerGroupName(); receiveApiLatencyMean = new ReducedMetric(new MeanReducer()); @@ -65,26 +60,50 @@ public EventHubReceiverImpl(EventHubSpoutConfig config, String partitionId) { } @Override - public void open(IEventHubFilter filter) throws EventHubException { - logger.info("creating eventhub receiver: partitionId=" + partitionId + - ", filterString=" + filter.getFilterString()); + public void open(String offset) throws EventHubException { + logger.info("creating eventhub receiver: partitionId=" + partitionId + + ", offset=" + offset); long start = System.currentTimeMillis(); - receiver = new ResilientEventHubReceiver(connectionString, entityName, - partitionId, consumerGroupName, defaultCredits, filter); - receiver.initialize(); - + try { + ehClient = EventHubClient.createFromConnectionStringSync(connectionString); + receiver = ehClient.createEpochReceiverSync( + consumerGroupName, + partitionId, + offset, + false, + 1); + }catch (Exception e){ + logger.info("Exception in creating EventhubClient"+e.toString()); + } long end = System.currentTimeMillis(); logger.info("created eventhub receiver, time taken(ms): " + (end-start)); } @Override - public void close() { + public void close(){ if(receiver != null) { - receiver.close(); + try { + receiver.close().whenComplete((voidargs,error)->{ + try{ + if(error!=null){ + logger.error("Exception during receiver close phase"+error.toString()); + } + ehClient.closeSync(); + }catch (Exception e){ + logger.error("Exception during ehclient close phase"+e.toString()); + } + }).get(); + }catch (InterruptedException e){ + logger.error("Exception occured during close phase"+e.toString()); + }catch (ExecutionException e){ + logger.error("Exception occured during close phase"+e.toString()); + } logger.info("closed eventhub receiver: partitionId=" + partitionId ); receiver = null; + ehClient = null; } } + @Override public boolean isOpen() { @@ -92,50 +111,29 @@ public boolean isOpen() { } @Override - public EventData receive(long timeoutInMilliseconds) { + public EventDataWrap receive() { long start = System.currentTimeMillis(); - Message message = receiver.receive(timeoutInMilliseconds); + Iterable receivedEvents=null; + /*Get one message at a time for backward compatibility behaviour*/ + try { + receivedEvents = receiver.receiveSync(1); + }catch (Exception e){ + logger.error("Exception occured during receive"+e.toString()); + } long end = System.currentTimeMillis(); long millis = (end - start); receiveApiLatencyMean.update(millis); receiveApiCallCount.incr(); - - if (message == null) { - //Temporary workaround for AMQP/EH bug of failing to receive messages - /*if(timeoutInMilliseconds > 100 && millis < timeoutInMilliseconds/2) { - throw new RuntimeException( - "Restart EventHubSpout due to failure of receiving messages in " - + millis + " millisecond"); - }*/ + if (receivedEvents == null) { return null; } - receiveMessageCount.incr(); + EventData receivedEvent = receivedEvents.iterator().next(); + MessageId messageId = new MessageId(partitionId, + receivedEvent.getSystemProperties().getOffset(), + receivedEvent.getSystemProperties().getSequenceNumber()); - MessageId messageId = createMessageId(message); - return EventData.create(message, messageId); - } - - private MessageId createMessageId(Message message) { - String offset = null; - long sequenceNumber = 0; - - for (Section section : message.getPayload()) { - if (section instanceof MessageAnnotations) { - MessageAnnotations annotations = (MessageAnnotations) section; - HashMap annonationMap = (HashMap) annotations.getValue(); - - if (annonationMap.containsKey(OffsetKey)) { - offset = (String) annonationMap.get(OffsetKey); - } - - if (annonationMap.containsKey(SequenceNumberKey)) { - sequenceNumber = (Long) annonationMap.get(SequenceNumberKey); - } - } - } - - return MessageId.create(partitionId, offset, sequenceNumber); + return EventDataWrap.create(receivedEvent,messageId); } @Override diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java index 6adef420509..b14e3f6bf4c 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java @@ -24,6 +24,7 @@ import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; +import com.microsoft.azure.eventhubs.EventData; import java.util.HashMap; import java.util.List; @@ -174,7 +175,7 @@ public Object getValueAndReset() { @Override public void nextTuple() { - EventData eventData = null; + EventDataWrap eventDatawrap = null; List partitionManagers = partitionCoordinator.getMyPartitionManagers(); for (int i = 0; i < partitionManagers.size(); i++) { @@ -185,20 +186,16 @@ public void nextTuple() { throw new RuntimeException("partitionManager doesn't exist."); } - eventData = partitionManager.receive(); + eventDatawrap = partitionManager.receive(); - if (eventData != null) { + if (eventDatawrap != null) { break; } } - - if (eventData != null) { - MessageId messageId = eventData.getMessageId(); - Message message = eventData.getMessage(); - - List tuples = scheme.deserialize(message); - + if (eventDatawrap != null) { + MessageId messageId = eventDatawrap.getMessageId(); + List tuples = scheme.deserialize(eventDatawrap.getEventData()); if (tuples != null) { collector.emit(tuples, messageId); } diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java index e06953aa05b..3bbfa525033 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java @@ -20,7 +20,8 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import com.microsoft.eventhubs.client.ConnectionStringBuilder; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.servicebus.ConnectionStringBuilder; public class EventHubSpoutConfig implements Serializable { private static final long serialVersionUID = 1L; @@ -42,8 +43,7 @@ public class EventHubSpoutConfig implements Serializable { private String connectionString; private String topologyName; private IEventDataScheme scheme = new StringEventDataScheme(); - private String consumerGroupName = null; // if null then use default - // consumer group + private String consumerGroupName = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME; private String outputStreamId; @@ -52,8 +52,8 @@ public EventHubSpoutConfig(String username, String password, String namespace, String entityPath, int partitionCount) { this.userName = username; this.password = password; - this.connectionString = new ConnectionStringBuilder(username, password, - namespace).getConnectionString(); + this.connectionString = new ConnectionStringBuilder(namespace,entityPath, + username,password).toString(); this.namespace = namespace; this.entityPath = entityPath; this.partitionCount = partitionCount; @@ -232,9 +232,12 @@ public String getConnectionString() { return connectionString; } + /*Keeping it for backward compatibility*/ public void setTargetAddress(String targetFqnAddress) { - this.connectionString = new ConnectionStringBuilder(userName, password, - namespace, targetFqnAddress).getConnectionString(); + } + + public void setTargetAddress(){ + } public EventHubSpoutConfig withTargetAddress(String targetFqnAddress) { diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java index b8101b966ab..a16a992cdc8 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java @@ -20,7 +20,7 @@ import org.apache.storm.tuple.Fields; import java.io.Serializable; import java.util.List; -import org.apache.qpid.amqp_1_0.client.Message; +import com.microsoft.azure.eventhubs.EventData; public interface IEventDataScheme extends Serializable { @@ -29,10 +29,10 @@ public interface IEventDataScheme extends Serializable { * * @see #getOutputFields() for the list of fields the tuple will contain. * - * @param message The Message to Deserialize. + * @param eventData The EventData to Deserialize. * @return A tuple containing the deserialized fields of the message. */ - List deserialize(Message message); + List deserialize(EventData eventData); /** * Retrieve the Fields that are present on tuples created by this object. diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java index bc2db14101f..8f51ec1f84f 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java @@ -20,17 +20,16 @@ import java.util.Map; import com.microsoft.eventhubs.client.EventHubException; -import com.microsoft.eventhubs.client.IEventHubFilter; public interface IEventHubReceiver { - void open(IEventHubFilter filter) throws EventHubException; + void open(String offset) throws EventHubException; void close(); - + boolean isOpen(); - EventData receive(long timeoutInMilliseconds); - + EventDataWrap receive(); + Map getMetricsData(); } diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java index ac986d3b7cb..845f50815bc 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java @@ -25,7 +25,7 @@ public interface IPartitionManager { void close(); - EventData receive(); + EventDataWrap receive(); void checkpoint(); diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java index 105474209e5..20e021a8a8a 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java @@ -29,9 +29,9 @@ public class PartitionManager extends SimplePartitionManager { private final int ehReceiveTimeoutMs = 5000; //all sent events are stored in pending - private final Map pending; + private final Map pending; //all failed events are put in toResend, which is sorted by event's offset - private final TreeSet toResend; + private final TreeSet toResend; public PartitionManager( EventHubSpoutConfig spoutConfig, @@ -41,29 +41,29 @@ public PartitionManager( super(spoutConfig, partitionId, stateStore, receiver); - this.pending = new LinkedHashMap(); - this.toResend = new TreeSet(); + this.pending = new LinkedHashMap(); + this.toResend = new TreeSet(); } @Override - public EventData receive() { + public EventDataWrap receive() { if(pending.size() >= config.getMaxPendingMsgsPerPartition()) { return null; } - EventData eventData; + EventDataWrap eventDatawrap; if (toResend.isEmpty()) { - eventData = receiver.receive(ehReceiveTimeoutMs); + eventDatawrap = receiver.receive(); } else { - eventData = toResend.pollFirst(); + eventDatawrap = toResend.pollFirst(); } - if (eventData != null) { - lastOffset = eventData.getMessageId().getOffset(); - pending.put(lastOffset, eventData); + if (eventDatawrap != null) { + lastOffset = eventDatawrap.getMessageId().getOffset(); + pending.put(lastOffset, eventDatawrap); } - return eventData; + return eventDatawrap; } @Override @@ -74,8 +74,8 @@ public void ack(String offset) { @Override public void fail(String offset) { logger.warn("fail on " + offset); - EventData eventData = pending.remove(offset); - toResend.add(eventData); + EventDataWrap eventDataWrap = pending.remove(offset); + toResend.add(eventDataWrap); } @Override diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java index b66a7850818..9aa2736f9f6 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java @@ -26,7 +26,7 @@ import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter; import com.microsoft.eventhubs.client.EventHubOffsetFilter; import com.microsoft.eventhubs.client.IEventHubFilter; - +import com.microsoft.azure.eventhubs.EventData; /** * A simple partition manager that does not re-send failed messages */ @@ -65,16 +65,13 @@ public void open() throws Exception { offset = Constants.DefaultStartingOffset; } - IEventHubFilter filter; if (offset.equals(Constants.DefaultStartingOffset) && config.getEnqueueTimeFilter() != 0) { - filter = new EventHubEnqueueTimeFilter(config.getEnqueueTimeFilter()); - } - else { - filter = new EventHubOffsetFilter(offset); + offset = Long.toString(config.getEnqueueTimeFilter()); } - receiver.open(filter); + + receiver.open(offset); } @Override @@ -98,12 +95,12 @@ protected String getCompletedOffset() { } @Override - public EventData receive() { - EventData eventData = receiver.receive(5000); - if (eventData != null) { - lastOffset = eventData.getMessageId().getOffset(); + public EventDataWrap receive() { + EventDataWrap eventDatawrap = receiver.receive(); + if (eventDatawrap != null) { + lastOffset = eventDatawrap.getEventData().getSystemProperties().getOffset(); } - return eventData; + return eventDatawrap; } @Override diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java index 0c6f8b6eda9..35f10c15f64 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java @@ -19,17 +19,10 @@ import org.apache.storm.tuple.Fields; +import java.nio.charset.Charset; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; - -import org.apache.qpid.amqp_1_0.client.Message; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; -import org.apache.qpid.amqp_1_0.type.messaging.Data; -import org.apache.storm.tuple.Fields; +import com.microsoft.azure.eventhubs.EventData; /** * An Event Data Scheme which deserializes message payload into the Strings. @@ -46,19 +39,11 @@ public class StringEventDataScheme implements IEventDataScheme { private static final long serialVersionUID = 1L; @Override - public List deserialize(Message message) { + public List deserialize(EventData eventData) { final List fieldContents = new ArrayList(); - - for (Section section : message.getPayload()) { - if (section instanceof Data) { - Data data = (Data) section; - fieldContents.add(new String(data.getValue().getArray())); - } else if (section instanceof AmqpValue) { - AmqpValue amqpValue = (AmqpValue) section; - fieldContents.add(amqpValue.getValue().toString()); - } - } - + String messageData = ""; + messageData = new String (eventData.getBody(),eventData.getBodyOffset(),eventData.getBodyLength(),Charset.defaultCharset()); + fieldContents.add(messageData); return fieldContents; } diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java index fbe779d0627..069d819b855 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java @@ -17,9 +17,9 @@ *******************************************************************************/ package org.apache.storm.eventhubs.trident; -import java.util.List; +import org.apache.storm.eventhubs.spout.EventDataWrap; -import org.apache.storm.eventhubs.spout.EventData; +import java.util.List; public interface ITridentPartitionManager { boolean open(String offset); @@ -31,5 +31,5 @@ public interface ITridentPartitionManager { * @param count max number of messages in this batch * @return list of EventData, if failed to receive, return empty list */ - public List receiveBatch(String offset, int count); + public List receiveBatch(String offset, int count); } diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java index e5c1c5093c7..dc6e5bb9f63 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java @@ -24,7 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.storm.eventhubs.spout.EventData; +import org.apache.storm.eventhubs.spout.EventDataWrap; import org.apache.storm.eventhubs.spout.EventHubReceiverImpl; import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; import org.apache.storm.eventhubs.spout.IEventHubReceiver; @@ -111,15 +111,15 @@ public void emitPartitionBatch(TransactionAttempt attempt, int count = Integer.parseInt((String)meta.get("count")); logger.info("re-emit for partition " + partition.getId() + ", offset=" + offset + ", count=" + count); ITridentPartitionManager pm = getOrCreatePartitionManager(partition); - List listEvents = pm.receiveBatch(offset, count); + List listEvents = pm.receiveBatch(offset, count); if(listEvents.size() != count) { logger.error("failed to refetch eventhub messages, new count=" + listEvents.size()); return; } - for(EventData ed: listEvents) { + for(EventDataWrap ed: listEvents) { List tuples = - spoutConfig.getEventDataScheme().deserialize(ed.getMessage()); + spoutConfig.getEventDataScheme().deserialize(ed.getEventData()); collector.emit(tuples); } } @@ -135,13 +135,13 @@ public Map emitPartitionBatchNew(TransactionAttempt attempt, //logger.info("emit for partition " + partition.getId() + ", offset=" + offset); String nextOffset = offset; - List listEvents = pm.receiveBatch(offset, batchSize); + List listEvents = pm.receiveBatch(offset, batchSize); - for(EventData ed: listEvents) { + for(EventDataWrap ed: listEvents) { //update nextOffset; nextOffset = ed.getMessageId().getOffset(); List tuples = - spoutConfig.getEventDataScheme().deserialize(ed.getMessage()); + spoutConfig.getEventDataScheme().deserialize(ed.getEventData()); collector.emit(tuples); } //logger.info("emitted new batches: " + listEvents.size()); diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java index 159fe418c9d..52574c57933 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java @@ -20,15 +20,13 @@ import java.util.ArrayList; import java.util.List; +import org.apache.storm.eventhubs.spout.EventDataWrap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.microsoft.eventhubs.client.Constants; -import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter; import com.microsoft.eventhubs.client.EventHubException; -import com.microsoft.eventhubs.client.EventHubOffsetFilter; -import org.apache.storm.eventhubs.spout.EventData; import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; import org.apache.storm.eventhubs.spout.IEventHubReceiver; @@ -49,10 +47,10 @@ public boolean open(String offset) { try { if((offset == null || offset.equals(Constants.DefaultStartingOffset)) && spoutConfig.getEnqueueTimeFilter() != 0) { - receiver.open(new EventHubEnqueueTimeFilter(spoutConfig.getEnqueueTimeFilter())); + receiver.open(offset); } else { - receiver.open(new EventHubOffsetFilter(offset)); + receiver.open(offset); } lastOffset = offset; return true; @@ -69,8 +67,8 @@ public void close() { } @Override - public List receiveBatch(String offset, int count) { - List batch = new ArrayList(count); + public List receiveBatch(String offset, int count) { + List batch = new ArrayList(count); if(!offset.equals(lastOffset) || !receiver.isOpen()) { //re-establish connection to eventhub servers using the right offset //TBD: might be optimized with cache. @@ -81,7 +79,7 @@ public List receiveBatch(String offset, int count) { } for(int i=0; i body = new ArrayList
(); + //the body of the message is "message" + currentOffset, e.g. "message123" - body.add(new Data(new Binary(("message" + currentOffset).getBytes()))); - Message m = new Message(body); + MessageId mid = new MessageId(partitionId, "" + currentOffset, currentOffset); - EventData ed = new EventData(m, mid); - return ed; + EventData ed = new EventData(("message" + currentOffset).getBytes()); + return EventDataWrap.create(ed,mid); } @Override diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java index dd63d5d23be..467461c9f3c 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java @@ -17,10 +17,6 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; -import org.apache.storm.eventhubs.spout.PartitionManager; -import org.apache.storm.eventhubs.spout.EventData; -import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; - /** * This mock exercises PartitionManager */ @@ -71,7 +67,7 @@ public String execute(String callSequence) { count = Integer.parseInt(cmd.substring(1)); } for(int i=0; i 0); } diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java index 49e544bfa16..3a2d0f6a9e7 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java @@ -40,7 +40,7 @@ public void testSpoutConfig() { "namespace", "entityname", 16); conf.setZkConnectionString("zookeeper"); conf.setCheckpointIntervalInSeconds(1); - assertEquals(conf.getConnectionString(), "amqps://username:pas%5Cs%2Bw%2Ford@namespace.servicebus.windows.net"); + assertEquals(conf.getConnectionString(), "Endpoint=amqps://namespace.servicebus.windows.net;EntityPath=entityname;SharedAccessKeyName=username;SharedAccessKey=pas\\s+w/ord;OperationTimeout=PT1M;RetryPolicy=Default"); } @Test From bb792715387820aac0756738fddbe9a350d7d8dc Mon Sep 17 00:00:00 2001 From: Ranjan Banerjee Date: Mon, 13 Mar 2017 14:25:41 -0700 Subject: [PATCH 2/2] applying changes from comments --- external/storm-eventhubs/pom.xml | 5 +++++ .../storm/eventhubs/bolt/EventHubBolt.java | 15 ++++++------- .../eventhubs/bolt/EventHubBoltConfig.java | 6 ++--- .../spout/BinaryEventDataScheme.java | 7 ++---- .../eventhubs/spout/EventDataScheme.java | 10 ++++----- .../eventhubs/spout/EventHubReceiverImpl.java | 22 +++++++++---------- .../storm/eventhubs/spout/EventHubSpout.java | 14 +++--------- .../eventhubs/spout/EventHubSpoutConfig.java | 15 +++---------- .../eventhubs/spout/IEventDataScheme.java | 3 ++- .../eventhubs/spout/IEventHubReceiver.java | 4 ++-- .../spout/SimplePartitionManager.java | 9 ++------ .../spout/StringEventDataScheme.java | 5 +++-- .../TransactionalTridentEventHubEmitter.java | 21 ++++++------------ .../trident/TridentPartitionManager.java | 14 +++++------- .../eventhubs/spout/EventHubReceiverMock.java | 5 ++--- .../storm/eventhubs/spout/TestEventData.java | 4 ++-- .../eventhubs/spout/TestEventHubSpout.java | 4 ++-- pom.xml | 1 + 18 files changed, 67 insertions(+), 97 deletions(-) diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml index a4aecfba2b0..2e54a561177 100755 --- a/external/storm-eventhubs/pom.xml +++ b/external/storm-eventhubs/pom.xml @@ -99,6 +99,11 @@ azure-eventhubs 0.11.0 + + com.microsoft.azure + azure-eventhubs + ${azure-eventhubs.version} + org.apache.storm storm-core diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java index 79ce0b1e6cc..79fb52f4bbd 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java @@ -17,22 +17,21 @@ *******************************************************************************/ package org.apache.storm.eventhubs.bolt; -import java.util.Map; -import java.util.concurrent.ExecutionException; - -import com.microsoft.azure.servicebus.ServiceBusException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.microsoft.azure.eventhubs.EventData; -import com.microsoft.azure.eventhubs.PartitionSender; import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.PartitionSender; +import com.microsoft.azure.servicebus.ServiceBusException; import com.microsoft.eventhubs.client.EventHubException; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ExecutionException; /** * A bolt that writes event message to EventHub. diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java index 41a2c3691e7..6177c2e5341 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java @@ -17,10 +17,10 @@ *******************************************************************************/ package org.apache.storm.eventhubs.bolt; -import java.io.Serializable; - -import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; import com.microsoft.azure.servicebus.ConnectionStringBuilder; +import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; + +import java.io.Serializable; /* * EventHubs bolt configurations diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java index 958597ed7c7..c4ced204e03 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java @@ -18,17 +18,14 @@ package org.apache.storm.eventhubs.spout; import com.microsoft.azure.eventhubs.EventData; -import org.apache.qpid.amqp_1_0.client.Message; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; -import org.apache.qpid.amqp_1_0.type.messaging.Data; import org.apache.storm.tuple.Fields; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; + + /** * An Event Data Scheme which deserializes message payload into the raw bytes. * diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java index 5e67b9e2fd6..b04081cbe6f 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java @@ -17,13 +17,13 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; +import com.microsoft.azure.eventhubs.EventData; import org.apache.storm.tuple.Fields; + +import java.nio.charset.Charset; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.nio.charset.Charset; -import com.microsoft.azure.eventhubs.EventData; /** * An Event Data Scheme which deserializes message payload into the Strings. No @@ -40,12 +40,12 @@ public class EventDataScheme implements IEventDataScheme { private static final long serialVersionUID = 1L; - @Override public List deserialize(EventData eventData) { final List fieldContents = new ArrayList(); String messageData = ""; - messageData = new String (eventData.getBody(),eventData.getBodyOffset(),eventData.getBodyLength(),Charset.defaultCharset()); + if(eventData.getBody()!=null) + messageData = new String (eventData.getBody(),eventData.getBodyOffset(),eventData.getBodyLength(),Charset.defaultCharset()); Map metaDataMap = eventData.getProperties(); fieldContents.add(messageData); fieldContents.add(metaDataMap); diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java index 56c225fdbb4..069a030a1f0 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java @@ -17,23 +17,20 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; +import com.microsoft.azure.eventhubs.EventData; import com.microsoft.azure.eventhubs.EventHubClient; import com.microsoft.azure.eventhubs.PartitionReceiver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import com.microsoft.azure.servicebus.ServiceBusException; +import com.microsoft.eventhubs.client.EventHubException; import org.apache.storm.metric.api.CountMetric; import org.apache.storm.metric.api.MeanReducer; import org.apache.storm.metric.api.ReducedMetric; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.microsoft.eventhubs.client.Constants; -import com.microsoft.eventhubs.client.EventHubException; -import com.microsoft.azure.eventhubs.EventData; - -import java.util.concurrent.ExecutionException; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; public class EventHubReceiverImpl implements IEventHubReceiver { private static final Logger logger = LoggerFactory.getLogger(EventHubReceiverImpl.class); @@ -104,7 +101,7 @@ public void close(){ } } - + @Override public boolean isOpen() { return (receiver != null); @@ -117,14 +114,15 @@ public EventDataWrap receive() { /*Get one message at a time for backward compatibility behaviour*/ try { receivedEvents = receiver.receiveSync(1); - }catch (Exception e){ + }catch (ServiceBusException e){ logger.error("Exception occured during receive"+e.toString()); } long end = System.currentTimeMillis(); long millis = (end - start); receiveApiLatencyMean.update(millis); receiveApiCallCount.incr(); - if (receivedEvents == null) { + + if (receivedEvents == null || receivedEvents.spliterator().getExactSizeIfKnown() == 0) { return null; } receiveMessageCount.incr(); diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java index b14e3f6bf4c..ac3279ddd63 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java @@ -17,24 +17,20 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; -import com.google.common.base.Strings; import org.apache.storm.Config; import org.apache.storm.metric.api.IMetric; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; -import com.microsoft.azure.eventhubs.EventData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.qpid.amqp_1_0.client.Message; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class EventHubSpout extends BaseRichSpout { private static final Logger logger = LoggerFactory.getLogger(EventHubSpout.class); @@ -240,11 +236,7 @@ public void close() { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - if (Strings.isNullOrEmpty(eventHubConfig.getOutputStreamId())) { - declarer.declare(scheme.getOutputFields()); - } else { - declarer.declareStream(eventHubConfig.getOutputStreamId(), scheme.getOutputFields()); - } + declarer.declare(scheme.getOutputFields()); } private void checkpointIfNeeded() { diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java index 3bbfa525033..62c61cd6de4 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java @@ -17,11 +17,12 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.servicebus.ConnectionStringBuilder; + import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import com.microsoft.azure.eventhubs.EventHubClient; -import com.microsoft.azure.servicebus.ConnectionStringBuilder; public class EventHubSpoutConfig implements Serializable { private static final long serialVersionUID = 1L; @@ -44,8 +45,6 @@ public class EventHubSpoutConfig implements Serializable { private String topologyName; private IEventDataScheme scheme = new StringEventDataScheme(); private String consumerGroupName = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME; - private String outputStreamId; - // These are mandatory parameters public EventHubSpoutConfig(String username, String password, @@ -244,12 +243,4 @@ public EventHubSpoutConfig withTargetAddress(String targetFqnAddress) { setTargetAddress(targetFqnAddress); return this; } - - public String getOutputStreamId() { - return outputStreamId; - } - - public void setOutputStreamId(String outputStreamId) { - this.outputStreamId = outputStreamId; - } } diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java index a16a992cdc8..6c7852410ca 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java @@ -17,10 +17,11 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; +import com.microsoft.azure.eventhubs.EventData; import org.apache.storm.tuple.Fields; + import java.io.Serializable; import java.util.List; -import com.microsoft.azure.eventhubs.EventData; public interface IEventDataScheme extends Serializable { diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java index 8f51ec1f84f..d094ca00ec8 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java @@ -17,10 +17,10 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; -import java.util.Map; - import com.microsoft.eventhubs.client.EventHubException; +import java.util.Map; + public interface IEventHubReceiver { void open(String offset) throws EventHubException; diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java index 9aa2736f9f6..9acb0a5fdf3 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java @@ -17,16 +17,11 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; -import java.util.Map; - +import com.microsoft.eventhubs.client.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.microsoft.eventhubs.client.Constants; -import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter; -import com.microsoft.eventhubs.client.EventHubOffsetFilter; -import com.microsoft.eventhubs.client.IEventHubFilter; -import com.microsoft.azure.eventhubs.EventData; +import java.util.Map; /** * A simple partition manager that does not re-send failed messages */ diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java index 35f10c15f64..e331f4c9b1a 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java @@ -17,12 +17,12 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; +import com.microsoft.azure.eventhubs.EventData; import org.apache.storm.tuple.Fields; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; -import com.microsoft.azure.eventhubs.EventData; /** * An Event Data Scheme which deserializes message payload into the Strings. @@ -42,7 +42,8 @@ public class StringEventDataScheme implements IEventDataScheme { public List deserialize(EventData eventData) { final List fieldContents = new ArrayList(); String messageData = ""; - messageData = new String (eventData.getBody(),eventData.getBodyOffset(),eventData.getBodyLength(),Charset.defaultCharset()); + if(eventData.getBody()!=null) + messageData = new String (eventData.getBody(),eventData.getBodyOffset(),eventData.getBodyLength(),Charset.defaultCharset()); fieldContents.add(messageData); return fieldContents; } diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java index dc6e5bb9f63..50be32075d3 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java @@ -17,24 +17,17 @@ *******************************************************************************/ package org.apache.storm.eventhubs.trident; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.storm.eventhubs.spout.EventDataWrap; -import org.apache.storm.eventhubs.spout.EventHubReceiverImpl; -import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; -import org.apache.storm.eventhubs.spout.IEventHubReceiver; -import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory; import com.microsoft.eventhubs.client.Constants; - +import org.apache.storm.eventhubs.spout.*; import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; import org.apache.storm.trident.spout.IPartitionedTridentSpout; import org.apache.storm.trident.topology.TransactionAttempt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; public class TransactionalTridentEventHubEmitter diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java index 52574c57933..bc80b8b5f92 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java @@ -17,18 +17,16 @@ *******************************************************************************/ package org.apache.storm.eventhubs.trident; -import java.util.ArrayList; -import java.util.List; - -import org.apache.storm.eventhubs.spout.EventDataWrap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.microsoft.eventhubs.client.Constants; import com.microsoft.eventhubs.client.EventHubException; - +import org.apache.storm.eventhubs.spout.EventDataWrap; import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; import org.apache.storm.eventhubs.spout.IEventHubReceiver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; public class TridentPartitionManager implements ITridentPartitionManager { private static final Logger logger = LoggerFactory.getLogger(TridentPartitionManager.class); diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java index 55b96b3b08f..607776641cc 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/EventHubReceiverMock.java @@ -17,12 +17,11 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; -import java.util.Map; import com.microsoft.azure.eventhubs.EventData; - - import com.microsoft.eventhubs.client.EventHubException; +import java.util.Map; + /** * A mock receiver that emits fake data with offset starting from given offset * and increase by 1 each time. diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java index 9fffb039f68..f260deacbc6 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventData.java @@ -17,12 +17,12 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; -import static org.junit.Assert.*; - import org.junit.After; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertTrue; + public class TestEventData { @Before diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java index 3a2d0f6a9e7..e4ddea33cba 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java @@ -17,12 +17,12 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; -import static org.junit.Assert.*; - import org.junit.After; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; + public class TestEventHubSpout { diff --git a/pom.xml b/pom.xml index b9fe6aaf732..f6ab5ba416e 100644 --- a/pom.xml +++ b/pom.xml @@ -297,6 +297,7 @@ 3.1.0 1.0 0.32 + 0.11.0 1.0.1 2.24.1