Skip to content

Commit

Permalink
handle callback in event-over-http
Browse files Browse the repository at this point in the history
  • Loading branch information
acn-ericlaw committed Mar 13, 2024
1 parent c09a368 commit 15ed2c9
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 8 deletions.
10 changes: 9 additions & 1 deletion connectors/adapters/kafka/kafka-standalone/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<snakeyaml.version>2.2</snakeyaml.version>
<hazelcast.version>5.3.6</hazelcast.version>
<kafka.version>3.6.1</kafka.version>
<kafka.version>3.7.0</kafka.version>
<gson.version>2.10.1</gson.version>
<netty.version>4.1.105.Final</netty.version>
<slf4j.version>2.0.11</slf4j.version>
Expand Down Expand Up @@ -58,6 +58,14 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ public String getError() {
// body is used to store error message if status is not 200
if (body == null) {
return "null";
} else if (body instanceof byte[]) {
return "***";
} else {
return body instanceof String? (String) body : body.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,11 +649,13 @@ public String substituteRouteIfAny(String to) {
}

public String getEventHttpTarget(String route) {
return eventHttpTargets.get(route);
int slash = route.indexOf('@');
return eventHttpTargets.get(slash == -1? route : route.substring(0, slash));
}

public Map<String, String> getEventHttpHeaders(String route) {
return eventHttpHeaders.get(route);
int slash = route.indexOf('@');
return eventHttpHeaders.get(slash == -1? route : route.substring(0, slash));
}

/**
Expand All @@ -671,13 +673,28 @@ public void send(final EventEnvelope event) throws IOException {
event.setTo(to);
String targetHttp = event.getHeader("_") == null? getEventHttpTarget(to) : null;
if (targetHttp != null) {
String callback = event.getReplyTo();
event.setReplyTo(null);
EventEnvelope forwardEvent = new EventEnvelope(event.toMap()).setHeader("_", "async");
Future<EventEnvelope> response = asyncRequest(forwardEvent, ASYNC_EVENT_HTTP_TIMEOUT,
getEventHttpHeaders(to), targetHttp, false);
getEventHttpHeaders(to), targetHttp, callback != null);
response.onSuccess(evt -> {
if (evt.getStatus() != 202) {
log.error("Error in sending async event {} to {} - status={}, error={}",
to, targetHttp, evt.getStatus(), evt.getError());
if (callback != null) {
// Send the RPC response from the remote target service to the callback
evt.setTo(callback).setReplyTo(null).setFrom(to)
.setTrace(event.getTraceId(), event.getTracePath())
.setCorrelationId(event.getCorrelationId());
try {
send(evt);
} catch (IOException e) {
log.error("Error in sending callback event {} from {} to {} - {}",
to, targetHttp, callback, e.getMessage());
}
} else {
if (evt.getStatus() != 202) {
log.error("Error in sending async event {} to {} - status={}, error={}",
to, targetHttp, evt.getStatus(), evt.getError());
}
}
});
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public void declarativeEventOverHttpTest() throws IOException, ExecutionExceptio
.setReplyTo(BLOCKING_EVENT_WAIT);
PostOffice po = new PostOffice("unit.test", "1200001", "EVENT /save/then/get");
po.send(save);
wait1.poll(5, TimeUnit.SECONDS);
Object serviceResponse = wait1.poll(5, TimeUnit.SECONDS);
Assert.assertEquals("saved", serviceResponse);
EventEnvelope get = new EventEnvelope().setTo(ROUTE).setHeader("type", "get");
Future<EventEnvelope> response2 = po.asyncRequest(get, 10000);
response2.onSuccess(evt -> wait2.offer(evt.getBody()));
Expand Down

0 comments on commit 15ed2c9

Please sign in to comment.