Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion external/storm-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
</parent>

<artifactId>storm-eventhubs</artifactId>
<version>2.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>storm-eventhubs</name>
<description>EventHubs Storm Spout</description>
Expand Down Expand Up @@ -94,6 +93,11 @@
<artifactId>eventhubs-client</artifactId>
<version>${eventhubs.client.version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>0.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.microsoft.eventhubs.client.EventHubClient;
import com.microsoft.azure.eventhubs.*;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.*; [](start = 36, length = 3)

nit: keep them expanded - usual pattern in OS is to specifically import

import com.microsoft.eventhubs.client.EventHubException;
import com.microsoft.eventhubs.client.EventHubSender;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
Expand All @@ -41,7 +39,8 @@ public class EventHubBolt extends BaseRichBolt {
.getLogger(EventHubBolt.class);

protected OutputCollector collector;
protected EventHubSender sender;
protected PartitionSender sender=null;
protected EventHubClient ehClient=null;
protected EventHubBoltConfig boltConfig;

public EventHubBolt(String connectionString, String entityPath) {
Expand Down Expand Up @@ -70,10 +69,9 @@ public void prepare(Map config, TopologyContext context,
logger.info("creating sender: " + boltConfig.getConnectionString()
+ ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
try {
EventHubClient eventHubClient = EventHubClient.create(
boltConfig.getConnectionString(),
boltConfig.getEntityPath());
sender = eventHubClient.createPartitionSender(myPartitionId);
ehClient = EventHubClient.createFromConnectionString(boltConfig.getConnectionString()).get();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get [](start = 90, length = 3)

use methods ending with sync(...) - which could return cleaner exception stacks

if (boltConfig.getPartitionMode())
sender = ehClient.createPartitionSender(Integer.toString(context.getThisTaskIndex())).get();
} catch (Exception ex) {
collector.reportError(ex);
throw new RuntimeException(ex);
Expand All @@ -84,12 +82,39 @@ public void prepare(Map config, TopologyContext context,
@Override
public void execute(Tuple tuple) {
try {
sender.send(boltConfig.getEventDataFormat().serialize(tuple));
EventData sendEvent = new EventData(boltConfig.getEventDataFormat().serialize(tuple));
if(boltConfig.getPartitionMode() && sender!=null)
sender.send(sendEvent).get();
else if(boltConfig.getPartitionMode() && sender==null)
throw new EventHubException("Sender is null");
else if(!boltConfig.getPartitionMode() && ehClient!=null)
ehClient.send(sendEvent).get();
else if(!boltConfig.getPartitionMode() && ehClient==null)
throw new EventHubException("ehclient is null");
collector.ack(tuple);
} catch (EventHubException ex) {
collector.reportError(ex);
collector.fail(tuple);
}
catch (Exception e){

}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why catch-all ?
This could potentially make this bolt a blackhole :)

}

@Override
public void cleanup() {
try{
if(sender!=null) {
sender.close().get();
sender = null;
}
if(ehClient!=null){
ehClient.close().get();
ehClient = null;
}
}catch (Exception e){
logger.error("Exception occured during close phase"+e.toString());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.io.Serializable;

import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
import com.microsoft.eventhubs.client.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.ConnectionStringBuilder;

/*
* EventHubs bolt configurations
Expand Down Expand Up @@ -81,8 +81,8 @@ public EventHubBoltConfig(String userName, String password, String namespace,
public EventHubBoltConfig(String userName, String password, String namespace,
String targetFqnAddress, String entityPath, boolean partitionMode,
IEventDataFormat dataFormat) {
this.connectionString = new ConnectionStringBuilder(userName, password,
namespace, targetFqnAddress).getConnectionString();
this.connectionString = new ConnectionStringBuilder(namespace,entityPath,
userName,password).toString();
this.entityPath = entityPath;
this.partitionMode = partitionMode;
this.dataFormat = dataFormat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
*******************************************************************************/
package org.apache.storm.eventhubs.spout;

import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import org.apache.qpid.amqp_1_0.client.Message;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.messaging.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,7 +34,10 @@
import com.microsoft.eventhubs.client.IEventHubFilter;
import com.microsoft.eventhubs.client.ResilientEventHubReceiver;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.qpid.amqp_1_0.type.Section;
Expand All @@ -39,24 +46,21 @@

public class EventHubReceiverImpl implements IEventHubReceiver {
private static final Logger logger = LoggerFactory.getLogger(EventHubReceiverImpl.class);
private static final Symbol OffsetKey = Symbol.valueOf(Constants.OffsetKey);
private static final Symbol SequenceNumberKey = Symbol.valueOf(Constants.SequenceNumberKey);

private final String connectionString;
private final String entityName;
private final String partitionId;
private final int defaultCredits;
private final String consumerGroupName;

private ResilientEventHubReceiver receiver;
private PartitionReceiver receiver;
private EventHubClient ehClient=null;
private ReducedMetric receiveApiLatencyMean;
private CountMetric receiveApiCallCount;
private CountMetric receiveMessageCount;

public EventHubReceiverImpl(EventHubSpoutConfig config, String partitionId) {
this.connectionString = config.getConnectionString();
this.entityName = config.getEntityPath();
this.defaultCredits = config.getReceiverCredits();
this.partitionId = partitionId;
this.consumerGroupName = config.getConsumerGroupName();
receiveApiLatencyMean = new ReducedMetric(new MeanReducer());
Expand All @@ -65,24 +69,38 @@ public EventHubReceiverImpl(EventHubSpoutConfig config, String partitionId) {
}

@Override
public void open(IEventHubFilter filter) throws EventHubException {
logger.info("creating eventhub receiver: partitionId=" + partitionId +
", filterString=" + filter.getFilterString());
public void open(String offset) throws EventHubException {
logger.info("creating eventhub receiver: partitionId=" + partitionId +
", offset=" + offset);
long start = System.currentTimeMillis();
receiver = new ResilientEventHubReceiver(connectionString, entityName,
partitionId, consumerGroupName, defaultCredits, filter);
receiver.initialize();

try {
ehClient = EventHubClient.createFromConnectionString(connectionString).get();
receiver = ehClient.createEpochReceiver(
consumerGroupName,
partitionId,
offset,
false,
1).get();
}catch (Exception e){
logger.info("Exception in creating EventhubClient"+e.toString());
}
long end = System.currentTimeMillis();
logger.info("created eventhub receiver, time taken(ms): " + (end-start));
}

@Override
public void close() {
public void close(){
if(receiver != null) {
receiver.close();
try {
receiver.close().get();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.get() [](start = 24, length = 6)

if receiver.close() fails - this results into connection leak; handle this case. something like:
receiver.whenComplete(() -> { ehClient.closeSync() }).get();

if(ehClient!=null)
ehClient.close().get();
}catch (Exception e){
logger.error("Exception occured during close phase"+e.toString());
}
logger.info("closed eventhub receiver: partitionId=" + partitionId );
receiver = null;
ehClient = null;
}
}

Expand All @@ -92,50 +110,34 @@ public boolean isOpen() {
}

@Override
public EventData receive(long timeoutInMilliseconds) {
public EventData receive() {
long start = System.currentTimeMillis();
Message message = receiver.receive(timeoutInMilliseconds);
Iterable<com.microsoft.azure.eventhubs.EventData> receivedEvents=null;
/*Get one message at a time for backward compatibility behaviour*/
try {
receivedEvents = receiver.receive(1).get();
}catch (Exception e){
logger.error("Exception occured during receive"+e.toString());
}
long end = System.currentTimeMillis();
long millis = (end - start);
receiveApiLatencyMean.update(millis);
receiveApiCallCount.incr();

if (message == null) {
//Temporary workaround for AMQP/EH bug of failing to receive messages
/*if(timeoutInMilliseconds > 100 && millis < timeoutInMilliseconds/2) {
throw new RuntimeException(
"Restart EventHubSpout due to failure of receiving messages in "
+ millis + " millisecond");
}*/
if (receivedEvents == null) {
return null;
}

receiveMessageCount.incr();

MessageId messageId = createMessageId(message);
return EventData.create(message, messageId);
}

private MessageId createMessageId(Message message) {
String offset = null;
long sequenceNumber = 0;

for (Section section : message.getPayload()) {
if (section instanceof MessageAnnotations) {
MessageAnnotations annotations = (MessageAnnotations) section;
HashMap annonationMap = (HashMap) annotations.getValue();

if (annonationMap.containsKey(OffsetKey)) {
offset = (String) annonationMap.get(OffsetKey);
}

if (annonationMap.containsKey(SequenceNumberKey)) {
sequenceNumber = (Long) annonationMap.get(SequenceNumberKey);
}
}
MessageId messageId=null;
Message message=null;
for (com.microsoft.azure.eventhubs.EventData receivedEvent : receivedEvents) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (com.microsoft.azure.eventhubs.EventData receivedEvent : receivedEvents) { [](start = 4, length = 78)

Why ? In the receive(1) call above code - you explicitly specified receive 1 message - so remove this..

messageId = new MessageId(partitionId,
receivedEvent.getSystemProperties().getOffset(),
receivedEvent.getSystemProperties().getSequenceNumber());
List<Section> body = new ArrayList<Section>();
body.add(new Data(new Binary((new String(receivedEvent.getBody(), Charset.defaultCharset())).getBytes())));
message = new Message(body);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not equivalent to the existing behavior. Message could have other amqp sections - ApplicationProperties and SystemProperties.
Ideally, you should return com.microsoft.azure.eventhubs.EventData. Please remove the EventData type created in the spout library - to eliminate confusion.

}

return MessageId.create(partitionId, offset, sequenceNumber);
return org.apache.storm.eventhubs.spout.EventData.create(message, messageId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import com.microsoft.eventhubs.client.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.servicebus.ConnectionStringBuilder;

public class EventHubSpoutConfig implements Serializable {
private static final long serialVersionUID = 1L;
Expand All @@ -42,8 +43,7 @@ public class EventHubSpoutConfig implements Serializable {
private String connectionString;
private String topologyName;
private IEventDataScheme scheme = new StringEventDataScheme();
private String consumerGroupName = null; // if null then use default
// consumer group
private String consumerGroupName = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME;
private String outputStreamId;


Expand All @@ -52,8 +52,8 @@ public EventHubSpoutConfig(String username, String password,
String namespace, String entityPath, int partitionCount) {
this.userName = username;
this.password = password;
this.connectionString = new ConnectionStringBuilder(username, password,
namespace).getConnectionString();
this.connectionString = new ConnectionStringBuilder(namespace,entityPath,
username,password).toString();
this.namespace = namespace;
this.entityPath = entityPath;
this.partitionCount = partitionCount;
Expand Down Expand Up @@ -232,9 +232,12 @@ public String getConnectionString() {
return connectionString;
}

/*Keeping it for backward compatibility*/
public void setTargetAddress(String targetFqnAddress) {
this.connectionString = new ConnectionStringBuilder(userName, password,
namespace, targetFqnAddress).getConnectionString();
}

public void setTargetAddress(){

}

public EventHubSpoutConfig withTargetAddress(String targetFqnAddress) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@

public interface IEventHubReceiver {

void open(IEventHubFilter filter) throws EventHubException;
void open(String offset) throws EventHubException;

void close();

boolean isOpen();

EventData receive(long timeoutInMilliseconds);
EventData receive();

Map getMetricsData();
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public EventData receive() {

EventData eventData;
if (toResend.isEmpty()) {
eventData = receiver.receive(ehReceiveTimeoutMs);
eventData = receiver.receive();
} else {
eventData = toResend.pollFirst();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,13 @@ public void open() throws Exception {
offset = Constants.DefaultStartingOffset;
}

IEventHubFilter filter;
if (offset.equals(Constants.DefaultStartingOffset)
&& config.getEnqueueTimeFilter() != 0) {
filter = new EventHubEnqueueTimeFilter(config.getEnqueueTimeFilter());
}
else {
filter = new EventHubOffsetFilter(offset);
offset = Long.toString(config.getEnqueueTimeFilter());
}

receiver.open(filter);

receiver.open(offset);
}

@Override
Expand All @@ -99,7 +96,7 @@ protected String getCompletedOffset() {

@Override
public EventData receive() {
EventData eventData = receiver.receive(5000);
EventData eventData = receiver.receive();
if (eventData != null) {
lastOffset = eventData.getMessageId().getOffset();
}
Expand Down
Loading