Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable dead letter topic #848

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions daprdocs/content/en/java-sdk-docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,26 @@ public class SubscriberController {
});
}

/**
* Handles a registered publish endpoint on this app adding a topic which manage to forward undeliverable messages.
*
* @param cloudEvent The cloud event received.
* @return A message containing the time.
*/
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
deadLetterTopic = "${deadLetterProperty:deadTopic}")
@PostMapping(path = "/testingtopic")
public Mono<Void> handleMessageWithErrorHandler(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
return Mono.fromRunnable(() -> {
try {
System.out.println("Subscriber got: " + cloudEvent.getData());
System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
rule = @Rule(match = "event.type == 'myevent.v2'", priority = 1))
@PostMapping(path = "/testingtopicV2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ public class SubscriberController {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();


/**
* Handles a registered publish endpoint on this app.
*
*
* @param cloudEvent The cloud event received.
* @return A message containing the time.
*/
Expand All @@ -58,6 +59,27 @@ public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<String
});
}


/**
* Handles a registered publish endpoint on this app adding a topic which manage to forward undeliverable messages.
*
* @param cloudEvent The cloud event received.
* @return A message containing the time.
*/
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
deadLetterTopic = "${deadLetterProperty:deadTopic}")
@PostMapping(path = "/testingtopic")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is breaking integration tests, please, use a different topic name and post mapping.

public Mono<Void> handleMessageWithErrorHandler(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
return Mono.fromRunnable(() -> {
try {
System.out.println("Subscriber got: " + cloudEvent.getData());
System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}

/**
* Handles a registered publish endpoint on this app (version 2 of a cloud
* event).
Expand All @@ -66,6 +88,7 @@ public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<String
* @return A message containing the time.
*/
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
deadLetterTopic = "${deadLetterProperty:deadTopic}",
rule = @Rule(match = "event.type == \"v2\"", priority = 1))
@PostMapping(path = "/testingtopicV2")
public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent cloudEvent) {
Expand All @@ -84,7 +107,8 @@ public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent clou
* @param cloudEvent The cloud event received.
* @return A message containing the time.
*/
@Topic(name = "bulkpublishtesting", pubsubName = "${myAppProperty:messagebus}")
@Topic(name = "bulkpublishtesting", pubsubName = "${myAppProperty:messagebus}",
deadLetterTopic = "${deadLetterProperty:deadTopic}")
@PostMapping(path = "/bulkpublishtesting")
public Mono<Void> handleBulkPublishMessage(@RequestBody(required = false) CloudEvent cloudEvent) {
return Mono.fromRunnable(() -> {
Expand All @@ -104,7 +128,8 @@ public Mono<Void> handleBulkPublishMessage(@RequestBody(required = false) CloudE
* @return A list of responses for each event.
*/
@BulkSubscribe()
@Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}")
@Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}",
deadLetterTopic = "${deadLetterProperty:deadTopic}")
@PostMapping(path = "/testingtopicbulk")
public Mono<BulkSubscribeAppResponse> handleBulkMessage(
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ private static void subscribeToTopics(
Rule rule = topic.rule();
String topicName = stringValueResolver.resolveStringValue(topic.name());
String pubSubName = stringValueResolver.resolveStringValue(topic.pubsubName());
String deadLetterTopic = stringValueResolver.resolveStringValue(topic.deadLetterTopic());
String match = stringValueResolver.resolveStringValue(rule.match());
if ((topicName != null) && (topicName.length() > 0) && pubSubName != null && pubSubName.length() > 0) {
try {
Expand All @@ -116,7 +117,7 @@ private static void subscribeToTopics(
List<String> routes = getAllCompleteRoutesForPost(clazz, method, topicName);
for (String route : routes) {
daprRuntime.addSubscribedTopic(
pubSubName, topicName, match, rule.priority(), route, metadata, bulkSubscribe);
pubSubName, topicName, match, rule.priority(), route, deadLetterTopic, metadata, bulkSubscribe);
}
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Error while parsing metadata: " + e);
Expand Down
83 changes: 67 additions & 16 deletions sdk-springboot/src/main/java/io/dapr/springboot/DaprRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* Internal Singleton to handle Dapr configuration.
*/
class DaprRuntime {

/**
* The singleton instance.
*/
Expand All @@ -33,8 +34,8 @@ class DaprRuntime {
private final Map<DaprTopicKey, DaprSubscriptionBuilder> subscriptionBuilders = new HashMap<>();

/**
* DaprRuntime should be used as a singleton, using {@link DaprRuntime#getInstance()}.
* The constructor's default scope is available for unit tests only.
* DaprRuntime should be used as a singleton, using {@link DaprRuntime#getInstance()}. The
* constructor's default scope is available for unit tests only.
*/
private DaprRuntime() {
}
Expand All @@ -60,38 +61,84 @@ public static DaprRuntime getInstance() {
* Adds a topic to the list of subscribed topics.
*
* @param pubSubName PubSub name to subscribe to.
* @param topicName Name of the topic being subscribed to.
* @param match Match expression for this route.
* @param priority Priority for this match relative to others.
* @param route Destination route for requests.
* @param metadata Metadata for extended subscription functionality.
* @param topicName Name of the topic being subscribed to.
* @param match Match expression for this route.
* @param priority Priority for this match relative to others.
* @param route Destination route for requests.
* @param metadata Metadata for extended subscription functionality.
*/
public synchronized void addSubscribedTopic(String pubSubName,
String topicName,
String match,
int priority,
String route,
Map<String,String> metadata) {
Map<String, String> metadata) {
this.addSubscribedTopic(pubSubName, topicName, match, priority, route, metadata, null);
}

/**
* Adds a topic to the list of subscribed topics.
*
* @param pubSubName PubSub name to subscribe to.
* @param topicName Name of the topic being subscribed to.
* @param match Match expression for this route.
* @param priority Priority for this match relative to others.
* @param route Destination route for requests.
* @param metadata Metadata for extended subscription functionality.
* @param pubSubName PubSub name to subscribe to.
* @param topicName Name of the topic being subscribed to.
* @param match Match expression for this route.
* @param priority Priority for this match relative to others.
* @param route Destination route for requests.
* @param metadata Metadata for extended subscription functionality.
* @param bulkSubscribe Bulk subscribe configuration.
*/
public synchronized void addSubscribedTopic(String pubSubName,
String topicName,
String match,
int priority,
String route,
Map<String,String> metadata,
Map<String, String> metadata,
DaprTopicBulkSubscribe bulkSubscribe) {
this.addSubscribedTopic(pubSubName, topicName, match, priority, route, null,
metadata, bulkSubscribe);
}

/**
* Adds a topic to the list of subscribed topics.
*
* @param pubSubName PubSub name to subscribe to.
* @param topicName Name of the topic being subscribed to.
* @param match Match expression for this route.
* @param priority Priority for this match relative to others.
* @param route Destination route for requests.
* @param deadLetterTopic Name of topic to forward undeliverable messages.
* @param metadata Metadata for extended subscription functionality.
*/
public synchronized void addSubscribedTopic(String pubSubName,
String topicName,
String match,
int priority,
String route,
String deadLetterTopic,
Map<String, String> metadata) {
this.addSubscribedTopic(pubSubName, topicName, match, priority, route, deadLetterTopic,
metadata, null);
}

/**
* Adds a topic to the list of subscribed topics.
*
* @param pubSubName PubSub name to subscribe to.
* @param topicName Name of the topic being subscribed to.
* @param match Match expression for this route.
* @param priority Priority for this match relative to others.
* @param route Destination route for requests.
* @param deadLetterTopic Name of topic to forward undeliverable messages.
* @param metadata Metadata for extended subscription functionality.
* @param bulkSubscribe Bulk subscribe configuration.
*/
public synchronized void addSubscribedTopic(String pubSubName,
String topicName,
String match,
int priority,
String route,
String deadLetterTopic,
Map<String, String> metadata,
DaprTopicBulkSubscribe bulkSubscribe) {
DaprTopicKey topicKey = new DaprTopicKey(pubSubName, topicName);

Expand All @@ -111,14 +158,18 @@ public synchronized void addSubscribedTopic(String pubSubName,
builder.setMetadata(metadata);
}

if (deadLetterTopic != null && !deadLetterTopic.isEmpty()) {
builder.setDeadLetterTopic(deadLetterTopic);
}

if (bulkSubscribe != null) {
builder.setBulkSubscribe(bulkSubscribe);
}
}

public synchronized DaprTopicSubscription[] listSubscribedTopics() {
List<DaprTopicSubscription> values = subscriptionBuilders.values().stream()
.map(b -> b.build()).collect(Collectors.toList());
.map(b -> b.build()).collect(Collectors.toList());
return values.toArray(new DaprTopicSubscription[0]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,65 +21,91 @@
import java.util.stream.Collectors;

class DaprSubscriptionBuilder {

private final String pubsubName;
private final String topic;
private final List<TopicRule> rules;
private String deadLetterTopic;
private String defaultPath;
private Map<String, String> metadata;

private DaprTopicBulkSubscribe bulkSubscribe;

/**
* Create a subscription topic.
*
* @param pubsubName The pubsub name to subscribe to.
* @param topic The topic to subscribe to.
* @param topic The topic to subscribe to.
*/
DaprSubscriptionBuilder(String pubsubName, String topic) {
this.pubsubName = pubsubName;
this.topic = topic;
this.rules = new ArrayList<>();
this.deadLetterTopic = null;
this.defaultPath = null;
this.metadata = Collections.emptyMap();
}

/**
* Sets the default path for the subscription.
*
* @param path The default path.
* @return this instance.
*/
DaprSubscriptionBuilder setDefaultPath(String path) {
if (defaultPath != null) {
if (!defaultPath.equals(path)) {
throw new RuntimeException(
String.format(
"a default route is already set for topic %s on pubsub %s (current: '%s', supplied: '%s')",
this.topic, this.pubsubName, this.defaultPath, path));
String.format(
"a default route is already set for topic %s on pubsub %s (current: '%s', supplied: '%s')",
this.topic, this.pubsubName, this.defaultPath, path));
}
}
defaultPath = path;
return this;
}

/**
* Sets the dead letter topic for the subscription.
*
* @param deadLetterTopic Name of dead letter topic.
* @return this instance.
*/
DaprSubscriptionBuilder setDeadLetterTopic(String deadLetterTopic) {
if (this.deadLetterTopic != null) {
if (!this.deadLetterTopic.equals(deadLetterTopic)) {
throw new RuntimeException(
String.format(
"a default dead letter topic is already set for topic %s on pubsub %s (current: '%s', supplied: '%s')",
this.topic, this.pubsubName, this.deadLetterTopic, deadLetterTopic));
}
}
this.deadLetterTopic = deadLetterTopic;
return this;
}

/**
* Adds a rule to the subscription.
* @param path The path to route to.
* @param match The CEL expression the event must match.
*
* @param path The path to route to.
* @param match The CEL expression the event must match.
* @param priority The priority of the rule.
* @return this instance.
*/
public DaprSubscriptionBuilder addRule(String path, String match, int priority) {
if (rules.stream().anyMatch(e -> e.getPriority() == priority)) {
throw new RuntimeException(
String.format(
"a rule priority of %d is already used for topic %s on pubsub %s",
priority, this.topic, this.pubsubName));
String.format(
"a rule priority of %d is already used for topic %s on pubsub %s",
priority, this.topic, this.pubsubName));
}
rules.add(new TopicRule(path, match, priority));
return this;
}

/**
* Sets the metadata for the subscription.
*
* @param metadata The metadata.
* @return this instance.
*/
Expand All @@ -90,6 +116,7 @@ public DaprSubscriptionBuilder setMetadata(Map<String, String> metadata) {

/**
* Sets the bulkSubscribe configuration for the subscription.
*
* @param bulkSubscribe The bulk subscribe configuration.
* @return this instance.
*/
Expand All @@ -100,6 +127,7 @@ public DaprSubscriptionBuilder setBulkSubscribe(DaprTopicBulkSubscribe bulkSubsc

/**
* Builds the DaprTopicSubscription that is returned by the application to Dapr.
*
* @return The DaprTopicSubscription.
*/
public DaprTopicSubscription build() {
Expand All @@ -109,16 +137,19 @@ public DaprTopicSubscription build() {
if (!rules.isEmpty()) {
Collections.sort(rules, Comparator.comparingInt(TopicRule::getPriority));
List<DaprTopicRule> topicRules = rules.stream()
.map(e -> new DaprTopicRule(e.match, e.path)).collect(Collectors.toList());
.map(e -> new DaprTopicRule(e.match, e.path)).collect(Collectors.toList());
routes = new DaprTopicRoutes(topicRules, defaultPath);
} else {
route = defaultPath;
}

return new DaprTopicSubscription(this.pubsubName, this.topic, route, routes, metadata, bulkSubscribe);
return new DaprTopicSubscription(this.pubsubName, this.topic, route, this.deadLetterTopic,
routes, metadata,
bulkSubscribe);
}

private static class TopicRule {

private final String path;
private final String match;
private final int priority;
Expand Down
Loading