diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml index 21738a554cd..e44050dd071 100755 --- a/external/storm-eventhubs/pom.xml +++ b/external/storm-eventhubs/pom.xml @@ -94,6 +94,11 @@ eventhubs-client ${eventhubs.client.version} + + com.microsoft.azure + azure-eventhubs + ${azure-eventhubs.version} + org.apache.storm storm-core diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java index 3d64cc5624e..79fb52f4bbd 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java @@ -17,20 +17,21 @@ *******************************************************************************/ package org.apache.storm.eventhubs.bolt; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.microsoft.eventhubs.client.EventHubClient; +import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.PartitionSender; +import com.microsoft.azure.servicebus.ServiceBusException; import com.microsoft.eventhubs.client.EventHubException; -import com.microsoft.eventhubs.client.EventHubSender; - import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ExecutionException; /** * A bolt that writes event message to EventHub. @@ -41,7 +42,8 @@ public class EventHubBolt extends BaseRichBolt { .getLogger(EventHubBolt.class); protected OutputCollector collector; - protected EventHubSender sender; + protected PartitionSender sender=null; + protected EventHubClient ehClient=null; protected EventHubBoltConfig boltConfig; public EventHubBolt(String connectionString, String entityPath) { @@ -70,10 +72,9 @@ public void prepare(Map config, TopologyContext context, logger.info("creating sender: " + boltConfig.getConnectionString() + ", " + boltConfig.getEntityPath() + ", " + myPartitionId); try { - EventHubClient eventHubClient = EventHubClient.create( - boltConfig.getConnectionString(), - boltConfig.getEntityPath()); - sender = eventHubClient.createPartitionSender(myPartitionId); + ehClient = EventHubClient.createFromConnectionStringSync(boltConfig.getConnectionString()); + if (boltConfig.getPartitionMode()) + sender = ehClient.createPartitionSenderSync(Integer.toString(context.getThisTaskIndex())); } catch (Exception ex) { collector.reportError(ex); throw new RuntimeException(ex); @@ -84,11 +85,47 @@ public void prepare(Map config, TopologyContext context, @Override public void execute(Tuple tuple) { try { - sender.send(boltConfig.getEventDataFormat().serialize(tuple)); + EventData sendEvent = new EventData(boltConfig.getEventDataFormat().serialize(tuple)); + if(boltConfig.getPartitionMode() && sender!=null) + sender.sendSync(sendEvent); + else if(boltConfig.getPartitionMode() && sender==null) + throw new EventHubException("Sender is null"); + else if(!boltConfig.getPartitionMode() && ehClient!=null) + ehClient.sendSync(sendEvent); + else if(!boltConfig.getPartitionMode() && ehClient==null) + throw new EventHubException("ehclient is null"); collector.ack(tuple); - } catch (EventHubException ex) { + } catch (EventHubException ex ) { collector.reportError(ex); collector.fail(tuple); + }catch (ServiceBusException e){ + collector.reportError(e); + collector.fail(tuple); + } + } + + @Override + public void cleanup() { + if(sender != null) { + try { + sender.close().whenComplete((voidargs,error)->{ + try{ + if(error!=null){ + logger.error("Exception during sender cleanup phase"+error.toString()); + } + ehClient.closeSync(); + }catch (Exception e){ + logger.error("Exception during ehclient cleanup phase"+e.toString()); + } + }).get(); + }catch (InterruptedException e){ + logger.error("Exception occured during cleanup phase"+e.toString()); + }catch (ExecutionException e){ + logger.error("Exception occured during cleanup phase"+e.toString()); + } + logger.info("Eventhub Bolt cleaned up"); + sender = null; + ehClient = null; } } diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java index 10b4e393e7c..6177c2e5341 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBoltConfig.java @@ -17,10 +17,10 @@ *******************************************************************************/ package org.apache.storm.eventhubs.bolt; -import java.io.Serializable; - +import com.microsoft.azure.servicebus.ConnectionStringBuilder; import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; -import com.microsoft.eventhubs.client.ConnectionStringBuilder; + +import java.io.Serializable; /* * EventHubs bolt configurations @@ -81,8 +81,8 @@ public EventHubBoltConfig(String userName, String password, String namespace, public EventHubBoltConfig(String userName, String password, String namespace, String targetFqnAddress, String entityPath, boolean partitionMode, IEventDataFormat dataFormat) { - this.connectionString = new ConnectionStringBuilder(userName, password, - namespace, targetFqnAddress).getConnectionString(); + this.connectionString = new ConnectionStringBuilder(namespace,entityPath, + userName,password).toString(); this.entityPath = entityPath; this.partitionMode = partitionMode; this.dataFormat = dataFormat; diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java index 7b0d7e54845..ef74f42c270 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java @@ -17,17 +17,17 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; -import org.apache.qpid.amqp_1_0.client.Message; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; -import org.apache.qpid.amqp_1_0.type.messaging.Data; +import com.microsoft.azure.eventhubs.EventData; import org.apache.storm.tuple.Fields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; + /** * An Event Data Scheme which deserializes message payload into the raw bytes. * @@ -37,23 +37,21 @@ */ public class BinaryEventDataScheme implements IEventDataScheme { + private static final Logger logger = LoggerFactory.getLogger(BinaryEventDataScheme.class); @Override - public List deserialize(Message message) { + public List deserialize(EventData eventData){ final List fieldContents = new ArrayList(); - - Map metaDataMap = new HashMap(); - byte[] messageData = new byte[0]; - - for (Section section : message.getPayload()) { - if (section instanceof Data) { - Data data = (Data) section; - messageData = data.getValue().getArray(); - } else if (section instanceof ApplicationProperties) { - final ApplicationProperties applicationProperties = (ApplicationProperties) section; - metaDataMap = applicationProperties.getValue(); + byte [] messageData = null; + if(eventData.getBytes() != null) + messageData = eventData.getBytes(); + else if(eventData.getObject()!=null) { + try { + messageData = Serializedeserializeutil.serialize(eventData.getObject()); + }catch (IOException e){ + logger.error("Failed to serialize object"+e.toString()); } } - + Map metaDataMap = eventData.getProperties(); fieldContents.add(messageData); fieldContents.add(metaDataMap); return fieldContents; diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java index 90cad0a899e..731b7e196e0 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java @@ -17,18 +17,17 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; +import com.microsoft.azure.eventhubs.EventData; import org.apache.storm.tuple.Fields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.Charset; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.qpid.amqp_1_0.client.Message; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; -import org.apache.qpid.amqp_1_0.type.messaging.Data; - /** * An Event Data Scheme which deserializes message payload into the Strings. No * encoding is assumed. The receiver will need to handle parsing of the string @@ -37,34 +36,28 @@ * The resulting tuple would contain two items: the the message string, and a * map of properties that include metadata, which can be used to determine who * processes the message, and how it is processed. - * + * * For passing the raw bytes of a messsage to Bolts, refer to * {@link BinaryEventDataScheme}. */ public class EventDataScheme implements IEventDataScheme { private static final long serialVersionUID = 1L; - + private static final Logger logger = LoggerFactory.getLogger(EventDataScheme.class); @Override - public List deserialize(Message message) { + public List deserialize(EventData eventData) { final List fieldContents = new ArrayList(); - - Map metaDataMap = new HashMap(); String messageData = ""; - - for (Section section : message.getPayload()) { - if (section instanceof Data) { - Data data = (Data) section; - messageData = new String(data.getValue().getArray()); - } else if (section instanceof AmqpValue) { - AmqpValue amqpValue = (AmqpValue) section; - messageData = amqpValue.getValue().toString(); - } else if (section instanceof ApplicationProperties) { - final ApplicationProperties applicationProperties = (ApplicationProperties) section; - metaDataMap = applicationProperties.getValue(); + if(eventData.getBytes()!=null) + messageData = new String (eventData.getBytes()); + else if(eventData.getObject()!=null){ + try{ + messageData = new String(Serializedeserializeutil.serialize(eventData.getObject()),Charset.defaultCharset()); + }catch (IOException e){ + logger.error("Failed to serialize object"+e.toString()); } } - + Map metaDataMap = eventData.getProperties(); fieldContents.add(messageData); fieldContents.add(metaDataMap); return fieldContents; diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java old mode 100755 new mode 100644 similarity index 72% rename from external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java rename to external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java index e5834b4f561..5eeb4d26956 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventData.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataWrap.java @@ -1,48 +1,48 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.storm.eventhubs.spout; - -import org.apache.qpid.amqp_1_0.client.Message; - -public class EventData implements Comparable { - private final Message message; - private final MessageId messageId; - - public EventData(Message message, MessageId messageId) { - this.message = message; - this.messageId = messageId; - } - - public static EventData create(Message message, MessageId messageId) { - return new EventData(message, messageId); - } - - public Message getMessage() { - return this.message; - } - - public MessageId getMessageId() { - return this.messageId; - } - - @Override - public int compareTo(EventData ed) { - return messageId.getSequenceNumber(). - compareTo(ed.getMessageId().getSequenceNumber()); - } -} +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.storm.eventhubs.spout; + +import com.microsoft.azure.eventhubs.EventData; + +public class EventDataWrap implements Comparable { + private final EventData eventData; + private final MessageId messageId; + + public EventDataWrap(EventData eventdata, MessageId messageId) { + this.eventData = eventdata; + this.messageId = messageId; + } + + public static EventDataWrap create(EventData eventData, MessageId messageId) { + return new EventDataWrap(eventData, messageId); + } + + public EventData getEventData() { + return this.eventData; + } + + public MessageId getMessageId() { + return this.messageId; + } + + @Override + public int compareTo(EventDataWrap ed) { + return messageId.getSequenceNumber(). + compareTo(ed.getMessageId().getSequenceNumber()); + } +} diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java index 5f9acbd344c..069a030a1f0 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubReceiverImpl.java @@ -17,38 +17,31 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; -import org.apache.qpid.amqp_1_0.client.Message; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.PartitionReceiver; +import com.microsoft.azure.servicebus.ServiceBusException; +import com.microsoft.eventhubs.client.EventHubException; import org.apache.storm.metric.api.CountMetric; import org.apache.storm.metric.api.MeanReducer; import org.apache.storm.metric.api.ReducedMetric; - -import com.microsoft.eventhubs.client.Constants; -import com.microsoft.eventhubs.client.EventHubException; -import com.microsoft.eventhubs.client.IEventHubFilter; -import com.microsoft.eventhubs.client.ResilientEventHubReceiver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; - -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations; +import java.util.concurrent.ExecutionException; public class EventHubReceiverImpl implements IEventHubReceiver { private static final Logger logger = LoggerFactory.getLogger(EventHubReceiverImpl.class); - private static final Symbol OffsetKey = Symbol.valueOf(Constants.OffsetKey); - private static final Symbol SequenceNumberKey = Symbol.valueOf(Constants.SequenceNumberKey); private final String connectionString; private final String entityName; private final String partitionId; - private final int defaultCredits; private final String consumerGroupName; - private ResilientEventHubReceiver receiver; + private PartitionReceiver receiver; + private EventHubClient ehClient=null; private ReducedMetric receiveApiLatencyMean; private CountMetric receiveApiCallCount; private CountMetric receiveMessageCount; @@ -56,7 +49,6 @@ public class EventHubReceiverImpl implements IEventHubReceiver { public EventHubReceiverImpl(EventHubSpoutConfig config, String partitionId) { this.connectionString = config.getConnectionString(); this.entityName = config.getEntityPath(); - this.defaultCredits = config.getReceiverCredits(); this.partitionId = partitionId; this.consumerGroupName = config.getConsumerGroupName(); receiveApiLatencyMean = new ReducedMetric(new MeanReducer()); @@ -65,77 +57,81 @@ public EventHubReceiverImpl(EventHubSpoutConfig config, String partitionId) { } @Override - public void open(IEventHubFilter filter) throws EventHubException { - logger.info("creating eventhub receiver: partitionId=" + partitionId + - ", filterString=" + filter.getFilterString()); + public void open(String offset) throws EventHubException { + logger.info("creating eventhub receiver: partitionId=" + partitionId + + ", offset=" + offset); long start = System.currentTimeMillis(); - receiver = new ResilientEventHubReceiver(connectionString, entityName, - partitionId, consumerGroupName, defaultCredits, filter); - receiver.initialize(); - + try { + ehClient = EventHubClient.createFromConnectionStringSync(connectionString); + receiver = ehClient.createEpochReceiverSync( + consumerGroupName, + partitionId, + offset, + false, + 1); + }catch (Exception e){ + logger.info("Exception in creating EventhubClient"+e.toString()); + } long end = System.currentTimeMillis(); logger.info("created eventhub receiver, time taken(ms): " + (end-start)); } @Override - public void close() { + public void close(){ if(receiver != null) { - receiver.close(); + try { + receiver.close().whenComplete((voidargs,error)->{ + try{ + if(error!=null){ + logger.error("Exception during receiver close phase"+error.toString()); + } + ehClient.closeSync(); + }catch (Exception e){ + logger.error("Exception during ehclient close phase"+e.toString()); + } + }).get(); + }catch (InterruptedException e){ + logger.error("Exception occured during close phase"+e.toString()); + }catch (ExecutionException e){ + logger.error("Exception occured during close phase"+e.toString()); + } logger.info("closed eventhub receiver: partitionId=" + partitionId ); receiver = null; + ehClient = null; } } - + + @Override public boolean isOpen() { return (receiver != null); } @Override - public EventData receive(long timeoutInMilliseconds) { + public EventDataWrap receive() { long start = System.currentTimeMillis(); - Message message = receiver.receive(timeoutInMilliseconds); + Iterable receivedEvents=null; + /*Get one message at a time for backward compatibility behaviour*/ + try { + receivedEvents = receiver.receiveSync(1); + }catch (ServiceBusException e){ + logger.error("Exception occured during receive"+e.toString()); + } long end = System.currentTimeMillis(); long millis = (end - start); receiveApiLatencyMean.update(millis); receiveApiCallCount.incr(); - - if (message == null) { - //Temporary workaround for AMQP/EH bug of failing to receive messages - /*if(timeoutInMilliseconds > 100 && millis < timeoutInMilliseconds/2) { - throw new RuntimeException( - "Restart EventHubSpout due to failure of receiving messages in " - + millis + " millisecond"); - }*/ + + if (receivedEvents == null || receivedEvents.spliterator().getExactSizeIfKnown() == 0) { return null; } - receiveMessageCount.incr(); + EventData receivedEvent = receivedEvents.iterator().next(); + MessageId messageId = new MessageId(partitionId, + receivedEvent.getSystemProperties().getOffset(), + receivedEvent.getSystemProperties().getSequenceNumber()); - MessageId messageId = createMessageId(message); - return EventData.create(message, messageId); - } - - private MessageId createMessageId(Message message) { - String offset = null; - long sequenceNumber = 0; - - for (Section section : message.getPayload()) { - if (section instanceof MessageAnnotations) { - MessageAnnotations annotations = (MessageAnnotations) section; - HashMap annonationMap = (HashMap) annotations.getValue(); - - if (annonationMap.containsKey(OffsetKey)) { - offset = (String) annonationMap.get(OffsetKey); - } - - if (annonationMap.containsKey(SequenceNumberKey)) { - sequenceNumber = (Long) annonationMap.get(SequenceNumberKey); - } - } - } - - return MessageId.create(partitionId, offset, sequenceNumber); + return EventDataWrap.create(receivedEvent,messageId); } @Override diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java index 6adef420509..e970debb2f5 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java @@ -24,16 +24,14 @@ import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import org.apache.qpid.amqp_1_0.client.Message; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class EventHubSpout extends BaseRichSpout { private static final Logger logger = LoggerFactory.getLogger(EventHubSpout.class); @@ -174,7 +172,7 @@ public Object getValueAndReset() { @Override public void nextTuple() { - EventData eventData = null; + EventDataWrap eventDatawrap = null; List partitionManagers = partitionCoordinator.getMyPartitionManagers(); for (int i = 0; i < partitionManagers.size(); i++) { @@ -185,20 +183,16 @@ public void nextTuple() { throw new RuntimeException("partitionManager doesn't exist."); } - eventData = partitionManager.receive(); + eventDatawrap = partitionManager.receive(); - if (eventData != null) { + if (eventDatawrap != null) { break; } } - - if (eventData != null) { - MessageId messageId = eventData.getMessageId(); - Message message = eventData.getMessage(); - - List tuples = scheme.deserialize(message); - + if (eventDatawrap != null) { + MessageId messageId = eventDatawrap.getMessageId(); + List tuples = scheme.deserialize(eventDatawrap.getEventData()); if (tuples != null) { collector.emit(tuples, messageId); } diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java index e06953aa05b..5556970bfd9 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java @@ -17,10 +17,12 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.servicebus.ConnectionStringBuilder; + import java.io.Serializable; import java.util.ArrayList; import java.util.List; -import com.microsoft.eventhubs.client.ConnectionStringBuilder; public class EventHubSpoutConfig implements Serializable { private static final long serialVersionUID = 1L; @@ -42,8 +44,7 @@ public class EventHubSpoutConfig implements Serializable { private String connectionString; private String topologyName; private IEventDataScheme scheme = new StringEventDataScheme(); - private String consumerGroupName = null; // if null then use default - // consumer group + private String consumerGroupName = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME; private String outputStreamId; @@ -52,8 +53,8 @@ public EventHubSpoutConfig(String username, String password, String namespace, String entityPath, int partitionCount) { this.userName = username; this.password = password; - this.connectionString = new ConnectionStringBuilder(username, password, - namespace).getConnectionString(); + this.connectionString = new ConnectionStringBuilder(namespace,entityPath, + username,password).toString(); this.namespace = namespace; this.entityPath = entityPath; this.partitionCount = partitionCount; @@ -232,9 +233,12 @@ public String getConnectionString() { return connectionString; } + /*Keeping it for backward compatibility*/ public void setTargetAddress(String targetFqnAddress) { - this.connectionString = new ConnectionStringBuilder(userName, password, - namespace, targetFqnAddress).getConnectionString(); + } + + public void setTargetAddress(){ + } public EventHubSpoutConfig withTargetAddress(String targetFqnAddress) { diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java index b8101b966ab..6c7852410ca 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java @@ -17,10 +17,11 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; +import com.microsoft.azure.eventhubs.EventData; import org.apache.storm.tuple.Fields; + import java.io.Serializable; import java.util.List; -import org.apache.qpid.amqp_1_0.client.Message; public interface IEventDataScheme extends Serializable { @@ -29,10 +30,10 @@ public interface IEventDataScheme extends Serializable { * * @see #getOutputFields() for the list of fields the tuple will contain. * - * @param message The Message to Deserialize. + * @param eventData The EventData to Deserialize. * @return A tuple containing the deserialized fields of the message. */ - List deserialize(Message message); + List deserialize(EventData eventData); /** * Retrieve the Fields that are present on tuples created by this object. diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java index bc2db14101f..d094ca00ec8 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventHubReceiver.java @@ -17,20 +17,19 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; -import java.util.Map; - import com.microsoft.eventhubs.client.EventHubException; -import com.microsoft.eventhubs.client.IEventHubFilter; + +import java.util.Map; public interface IEventHubReceiver { - void open(IEventHubFilter filter) throws EventHubException; + void open(String offset) throws EventHubException; void close(); - + boolean isOpen(); - EventData receive(long timeoutInMilliseconds); - + EventDataWrap receive(); + Map getMetricsData(); } diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java index ac986d3b7cb..845f50815bc 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IPartitionManager.java @@ -25,7 +25,7 @@ public interface IPartitionManager { void close(); - EventData receive(); + EventDataWrap receive(); void checkpoint(); diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java index 105474209e5..20e021a8a8a 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/PartitionManager.java @@ -29,9 +29,9 @@ public class PartitionManager extends SimplePartitionManager { private final int ehReceiveTimeoutMs = 5000; //all sent events are stored in pending - private final Map pending; + private final Map pending; //all failed events are put in toResend, which is sorted by event's offset - private final TreeSet toResend; + private final TreeSet toResend; public PartitionManager( EventHubSpoutConfig spoutConfig, @@ -41,29 +41,29 @@ public PartitionManager( super(spoutConfig, partitionId, stateStore, receiver); - this.pending = new LinkedHashMap(); - this.toResend = new TreeSet(); + this.pending = new LinkedHashMap(); + this.toResend = new TreeSet(); } @Override - public EventData receive() { + public EventDataWrap receive() { if(pending.size() >= config.getMaxPendingMsgsPerPartition()) { return null; } - EventData eventData; + EventDataWrap eventDatawrap; if (toResend.isEmpty()) { - eventData = receiver.receive(ehReceiveTimeoutMs); + eventDatawrap = receiver.receive(); } else { - eventData = toResend.pollFirst(); + eventDatawrap = toResend.pollFirst(); } - if (eventData != null) { - lastOffset = eventData.getMessageId().getOffset(); - pending.put(lastOffset, eventData); + if (eventDatawrap != null) { + lastOffset = eventDatawrap.getMessageId().getOffset(); + pending.put(lastOffset, eventDatawrap); } - return eventData; + return eventDatawrap; } @Override @@ -74,8 +74,8 @@ public void ack(String offset) { @Override public void fail(String offset) { logger.warn("fail on " + offset); - EventData eventData = pending.remove(offset); - toResend.add(eventData); + EventDataWrap eventDataWrap = pending.remove(offset); + toResend.add(eventDataWrap); } @Override diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/Serializedeserializeutil.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/Serializedeserializeutil.java new file mode 100644 index 00000000000..aee56ad343a --- /dev/null +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/Serializedeserializeutil.java @@ -0,0 +1,21 @@ +package org.apache.storm.eventhubs.spout; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; + +/** + * Created by rabaner on 3/22/2017. + */ +public class Serializedeserializeutil { + + public static byte[] serialize(Object obj) throws IOException { + try(ByteArrayOutputStream b = new ByteArrayOutputStream()){ + try(ObjectOutputStream o = new ObjectOutputStream(b)){ + o.writeObject(obj); + o.close(); + } + return b.toByteArray(); + } + } +} diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java index b66a7850818..9acb0a5fdf3 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/SimplePartitionManager.java @@ -17,16 +17,11 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; -import java.util.Map; - +import com.microsoft.eventhubs.client.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.microsoft.eventhubs.client.Constants; -import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter; -import com.microsoft.eventhubs.client.EventHubOffsetFilter; -import com.microsoft.eventhubs.client.IEventHubFilter; - +import java.util.Map; /** * A simple partition manager that does not re-send failed messages */ @@ -65,16 +60,13 @@ public void open() throws Exception { offset = Constants.DefaultStartingOffset; } - IEventHubFilter filter; if (offset.equals(Constants.DefaultStartingOffset) && config.getEnqueueTimeFilter() != 0) { - filter = new EventHubEnqueueTimeFilter(config.getEnqueueTimeFilter()); - } - else { - filter = new EventHubOffsetFilter(offset); + offset = Long.toString(config.getEnqueueTimeFilter()); } - receiver.open(filter); + + receiver.open(offset); } @Override @@ -98,12 +90,12 @@ protected String getCompletedOffset() { } @Override - public EventData receive() { - EventData eventData = receiver.receive(5000); - if (eventData != null) { - lastOffset = eventData.getMessageId().getOffset(); + public EventDataWrap receive() { + EventDataWrap eventDatawrap = receiver.receive(); + if (eventDatawrap != null) { + lastOffset = eventDatawrap.getEventData().getSystemProperties().getOffset(); } - return eventData; + return eventDatawrap; } @Override diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java index 0c6f8b6eda9..51e7efdc859 100644 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java @@ -17,48 +17,46 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; +import com.microsoft.azure.eventhubs.EventData; import org.apache.storm.tuple.Fields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.charset.Charset; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; - -import org.apache.qpid.amqp_1_0.client.Message; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; -import org.apache.qpid.amqp_1_0.type.messaging.Data; -import org.apache.storm.tuple.Fields; /** * An Event Data Scheme which deserializes message payload into the Strings. - * No encoding is assumed. The receiver will need to handle parsing of the + * No encoding is assumed. The receiver will need to handle parsing of the * string data in appropriate encoding. * - * Note: Unlike other schemes provided, this scheme does not include any - * metadata. - * - * For metadata please refer to {@link BinaryEventDataScheme}, {@link EventDataScheme} + * Note: Unlike other schemes provided, this scheme does not include any + * metadata. + * + * For metadata please refer to {@link BinaryEventDataScheme}, {@link EventDataScheme} */ public class StringEventDataScheme implements IEventDataScheme { private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(StringEventDataScheme.class); @Override - public List deserialize(Message message) { + public List deserialize(EventData eventData) { final List fieldContents = new ArrayList(); - - for (Section section : message.getPayload()) { - if (section instanceof Data) { - Data data = (Data) section; - fieldContents.add(new String(data.getValue().getArray())); - } else if (section instanceof AmqpValue) { - AmqpValue amqpValue = (AmqpValue) section; - fieldContents.add(amqpValue.getValue().toString()); + String messageData = ""; + if(eventData.getBytes()!=null) + messageData = new String (eventData.getBytes()); + else if(eventData.getObject()!=null){ + try{ + messageData = new String(Serializedeserializeutil.serialize(eventData.getObject()),Charset.defaultCharset()); + }catch (IOException e){ + logger.error("Failed to serialize object"+e.toString()); } } - + + fieldContents.add(messageData); return fieldContents; } diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java index fbe779d0627..069d819b855 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/ITridentPartitionManager.java @@ -17,9 +17,9 @@ *******************************************************************************/ package org.apache.storm.eventhubs.trident; -import java.util.List; +import org.apache.storm.eventhubs.spout.EventDataWrap; -import org.apache.storm.eventhubs.spout.EventData; +import java.util.List; public interface ITridentPartitionManager { boolean open(String offset); @@ -31,5 +31,5 @@ public interface ITridentPartitionManager { * @param count max number of messages in this batch * @return list of EventData, if failed to receive, return empty list */ - public List receiveBatch(String offset, int count); + public List receiveBatch(String offset, int count); } diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java index e5c1c5093c7..50be32075d3 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TransactionalTridentEventHubEmitter.java @@ -17,24 +17,17 @@ *******************************************************************************/ package org.apache.storm.eventhubs.trident; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.storm.eventhubs.spout.EventData; -import org.apache.storm.eventhubs.spout.EventHubReceiverImpl; -import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; -import org.apache.storm.eventhubs.spout.IEventHubReceiver; -import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory; import com.microsoft.eventhubs.client.Constants; - +import org.apache.storm.eventhubs.spout.*; import org.apache.storm.trident.operation.TridentCollector; -import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout; import org.apache.storm.trident.spout.IPartitionedTridentSpout; import org.apache.storm.trident.topology.TransactionAttempt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; public class TransactionalTridentEventHubEmitter @@ -111,15 +104,15 @@ public void emitPartitionBatch(TransactionAttempt attempt, int count = Integer.parseInt((String)meta.get("count")); logger.info("re-emit for partition " + partition.getId() + ", offset=" + offset + ", count=" + count); ITridentPartitionManager pm = getOrCreatePartitionManager(partition); - List listEvents = pm.receiveBatch(offset, count); + List listEvents = pm.receiveBatch(offset, count); if(listEvents.size() != count) { logger.error("failed to refetch eventhub messages, new count=" + listEvents.size()); return; } - for(EventData ed: listEvents) { + for(EventDataWrap ed: listEvents) { List tuples = - spoutConfig.getEventDataScheme().deserialize(ed.getMessage()); + spoutConfig.getEventDataScheme().deserialize(ed.getEventData()); collector.emit(tuples); } } @@ -135,13 +128,13 @@ public Map emitPartitionBatchNew(TransactionAttempt attempt, //logger.info("emit for partition " + partition.getId() + ", offset=" + offset); String nextOffset = offset; - List listEvents = pm.receiveBatch(offset, batchSize); + List listEvents = pm.receiveBatch(offset, batchSize); - for(EventData ed: listEvents) { + for(EventDataWrap ed: listEvents) { //update nextOffset; nextOffset = ed.getMessageId().getOffset(); List tuples = - spoutConfig.getEventDataScheme().deserialize(ed.getMessage()); + spoutConfig.getEventDataScheme().deserialize(ed.getEventData()); collector.emit(tuples); } //logger.info("emitted new batches: " + listEvents.size()); diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java index 159fe418c9d..bc80b8b5f92 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/TridentPartitionManager.java @@ -17,20 +17,16 @@ *******************************************************************************/ package org.apache.storm.eventhubs.trident; -import java.util.ArrayList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.microsoft.eventhubs.client.Constants; -import com.microsoft.eventhubs.client.EventHubEnqueueTimeFilter; import com.microsoft.eventhubs.client.EventHubException; -import com.microsoft.eventhubs.client.EventHubOffsetFilter; - -import org.apache.storm.eventhubs.spout.EventData; +import org.apache.storm.eventhubs.spout.EventDataWrap; import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; import org.apache.storm.eventhubs.spout.IEventHubReceiver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; public class TridentPartitionManager implements ITridentPartitionManager { private static final Logger logger = LoggerFactory.getLogger(TridentPartitionManager.class); @@ -49,10 +45,10 @@ public boolean open(String offset) { try { if((offset == null || offset.equals(Constants.DefaultStartingOffset)) && spoutConfig.getEnqueueTimeFilter() != 0) { - receiver.open(new EventHubEnqueueTimeFilter(spoutConfig.getEnqueueTimeFilter())); + receiver.open(offset); } else { - receiver.open(new EventHubOffsetFilter(offset)); + receiver.open(offset); } lastOffset = offset; return true; @@ -69,8 +65,8 @@ public void close() { } @Override - public List receiveBatch(String offset, int count) { - List batch = new ArrayList(count); + public List receiveBatch(String offset, int count) { + List batch = new ArrayList(count); if(!offset.equals(lastOffset) || !receiver.isOpen()) { //re-establish connection to eventhub servers using the right offset //TBD: might be optimized with cache. @@ -81,7 +77,7 @@ public List receiveBatch(String offset, int count) { } for(int i=0; i body = new ArrayList
(); + //the body of the message is "message" + currentOffset, e.g. "message123" - body.add(new Data(new Binary(("message" + currentOffset).getBytes()))); - Message m = new Message(body); + MessageId mid = new MessageId(partitionId, "" + currentOffset, currentOffset); - EventData ed = new EventData(m, mid); - return ed; + EventData ed = new EventData(("message" + currentOffset).getBytes()); + return EventDataWrap.create(ed,mid); } @Override diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java index dd63d5d23be..467461c9f3c 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/PartitionManagerCallerMock.java @@ -17,10 +17,6 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; -import org.apache.storm.eventhubs.spout.PartitionManager; -import org.apache.storm.eventhubs.spout.EventData; -import org.apache.storm.eventhubs.spout.EventHubSpoutConfig; - /** * This mock exercises PartitionManager */ @@ -71,7 +67,7 @@ public String execute(String callSequence) { count = Integer.parseInt(cmd.substring(1)); } for(int i=0; i 0); } diff --git a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java index 49e544bfa16..e4ddea33cba 100755 --- a/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java +++ b/external/storm-eventhubs/src/test/java/org/apache/storm/eventhubs/spout/TestEventHubSpout.java @@ -17,12 +17,12 @@ *******************************************************************************/ package org.apache.storm.eventhubs.spout; -import static org.junit.Assert.*; - import org.junit.After; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; + public class TestEventHubSpout { @@ -40,7 +40,7 @@ public void testSpoutConfig() { "namespace", "entityname", 16); conf.setZkConnectionString("zookeeper"); conf.setCheckpointIntervalInSeconds(1); - assertEquals(conf.getConnectionString(), "amqps://username:pas%5Cs%2Bw%2Ford@namespace.servicebus.windows.net"); + assertEquals(conf.getConnectionString(), "Endpoint=amqps://namespace.servicebus.windows.net;EntityPath=entityname;SharedAccessKeyName=username;SharedAccessKey=pas\\s+w/ord;OperationTimeout=PT1M;RetryPolicy=Default"); } @Test diff --git a/pom.xml b/pom.xml index b9fe6aaf732..bf781750bca 100644 --- a/pom.xml +++ b/pom.xml @@ -297,6 +297,7 @@ 3.1.0 1.0 0.32 + 0.13.0 1.0.1 2.24.1