Skip to content

Commit

Permalink
Asyncapi sse kafka proxy (#1099)
Browse files Browse the repository at this point in the history
  • Loading branch information
bmaidics authored Jun 18, 2024
1 parent 4e36eed commit ef0ee25
Show file tree
Hide file tree
Showing 43 changed files with 1,046 additions and 143 deletions.
13 changes: 13 additions & 0 deletions runtime/binding-asyncapi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-sse-kafka</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-tls</artifactId>
Expand Down Expand Up @@ -174,6 +180,13 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-sse-kafka</artifactId>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.function.LongFunction;
Expand All @@ -44,10 +45,13 @@
import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiParser;
import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiSchemaConfig;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiBinding;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.HttpHeaderFW;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.String8FW;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.stream.HttpBeginExFW;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.stream.MqttBeginExFW;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.stream.SseBeginExFW;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiServerView;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
Expand All @@ -57,20 +61,26 @@

public final class AsyncapiBindingConfig
{
public static final String SEND_OPERATION = "send";
public static final String RECEIVE_OPERATION = "receive";
public final long id;
public final String name;
public final KindConfig kind;
public final AsyncapiOptionsConfig options;
public final List<AsyncapiRouteConfig> routes;

private final Int2ObjectHashMap<String> typesByNamespaceId;
private final Int2ObjectHashMap<NamespaceConfig> composites;
private final Int2ObjectHashMap<CompositeNamespace> composites;
private final Long2LongHashMap apiIdsByNamespaceId;
private final AsyncapiNamespaceGenerator namespaceGenerator;
private final Long2LongHashMap compositeResolvedIds;
private final Object2LongHashMap<String> compositeResolvedIds;
private final Object2ObjectHashMap<Matcher, String> paths;
private final Object2ObjectHashMap<Matcher, String> topics;
private final Object2LongHashMap<String> schemaIdsByApiId;
private final Map<CharSequence, String> operationIds;
private final Map<String, Map<String, AsyncapiBinding>> operationBindings;
private final Map<CharSequence, String> receiveOperationIds;
private final Map<CharSequence, String> sendOperationIds;
private final LongFunction<CatalogHandler> supplyCatalog;
private final ToLongFunction<String> resolveId;
private final Consumer<NamespaceConfig> attach;
Expand All @@ -97,11 +107,15 @@ public AsyncapiBindingConfig(
this.options = (AsyncapiOptionsConfig) binding.options;
this.composites = new Int2ObjectHashMap<>();
this.apiIdsByNamespaceId = new Long2LongHashMap(-1);
this.compositeResolvedIds = new Long2LongHashMap(-1);
this.compositeResolvedIds = new Object2LongHashMap<>(-1);
this.schemaIdsByApiId = new Object2LongHashMap<>(-1);
this.typesByNamespaceId = new Int2ObjectHashMap<>();
this.paths = new Object2ObjectHashMap<>();
this.topics = new Object2ObjectHashMap<>();
this.operationIds = new TreeMap<>(CharSequence::compare);
this.operationBindings = new HashMap<>();
this.receiveOperationIds = new TreeMap<>(CharSequence::compare);
this.sendOperationIds = new TreeMap<>(CharSequence::compare);
this.helper = new HttpHeaderHelper();
this.parser = new AsyncapiParser();
this.attach = attachComposite;
Expand All @@ -115,16 +129,11 @@ public boolean isCompositeOriginId(
return typesByNamespaceId.containsKey(NamespacedId.namespaceId(originId));
}

public String getCompositeOriginType(
long originId)
{
return typesByNamespaceId.get(NamespacedId.namespaceId(originId));
}

public long resolveCompositeResolvedId(
long apiId)
long apiId,
String type)
{
return overrideRouteId != -1 ? overrideRouteId : compositeResolvedIds.get(apiId);
return overrideRouteId != -1 ? overrideRouteId : compositeResolvedIds.get(apiId + type);
}

public long resolveApiId(
Expand All @@ -139,7 +148,48 @@ public long resolveApiId(
return schemaIdsByApiId.get(apiId);
}

public String resolveOperationId(
public String resolveMqttOperationId(
MqttBeginExFW mqttBeginEx)
{
String topic;
String operationId = null;

switch (mqttBeginEx.kind())
{
case MqttBeginExFW.KIND_PUBLISH:
topic = mqttBeginEx.publish().topic().asString();
for (Map.Entry<Matcher, String> item : paths.entrySet())
{
Matcher matcher = item.getKey();
matcher.reset(topic);
if (matcher.find())
{
String channelName = item.getValue();
operationId = sendOperationIds.get(channelName);
break;
}
}
break;
case MqttBeginExFW.KIND_SUBSCRIBE:
topic = mqttBeginEx.subscribe().filters().matchFirst(x -> true).pattern().asString();
for (Map.Entry<Matcher, String> item : paths.entrySet())
{
Matcher matcher = item.getKey();
matcher.reset(topic);
if (matcher.find())
{
String channelName = item.getValue();
operationId = receiveOperationIds.get(channelName);
break;
}
}
break;
}

return operationId;
}

public String resolveHttpOperationId(
HttpBeginExFW httpBeginEx)
{
helper.visit(httpBeginEx);
Expand All @@ -161,6 +211,26 @@ public String resolveOperationId(
return operationId;
}

public String resolveSseOperationId(
SseBeginExFW sseBeginEx)
{
String operationId = null;

for (Map.Entry<Matcher, String> item : paths.entrySet())
{
Matcher matcher = item.getKey();
matcher.reset(sseBeginEx.path().asString());
if (matcher.find())
{
String channelName = item.getValue();
operationId = operationIds.get(channelName);
break;
}
}

return operationId;
}

public AsyncapiRouteConfig resolve(
long authorization)
{
Expand Down Expand Up @@ -194,33 +264,34 @@ public void attach(
attachServerClientBinding(binding, configs);
}

for (Map.Entry<Integer, NamespaceConfig> entry : composites.entrySet())
for (Map.Entry<Integer, CompositeNamespace> entry : composites.entrySet())
{
Integer k = entry.getKey();
NamespaceConfig v = entry.getValue();
CompositeNamespace v = entry.getValue();
NamespaceConfig namespaceConfig = v.composite;
List<BindingConfig> bindings;
boolean containsSse = v.bindings.stream().anyMatch(b -> b.type.equals("sse"));
boolean containsSse = namespaceConfig.bindings.stream().anyMatch(b -> b.type.equals("sse"));
if (containsSse)
{
if (binding.kind.equals(SERVER))
{
bindings = v.bindings.stream()
bindings = namespaceConfig.bindings.stream()
.filter(b -> b.type.equals("http") || b.type.equals("http-kafka") || b.type.equals("sse"))
.collect(toList());
}
else
{
bindings = v.bindings.stream()
bindings = namespaceConfig.bindings.stream()
.filter(b -> b.type.equals("sse"))
.collect(toList());
}
}
else
{
bindings = v.bindings.stream()
bindings = namespaceConfig.bindings.stream()
.filter(b -> b.type.equals("mqtt") || b.type.equals("http") || b.type.equals("sse") ||
b.type.equals("kafka") && b.kind == CACHE_CLIENT || b.type.equals("mqtt-kafka") ||
b.type.equals("http-kafka"))
b.type.equals("http-kafka") || b.type.equals("sse-kafka"))
.collect(toList());
}

Expand All @@ -231,7 +302,7 @@ public void attach(

public void detach()
{
composites.forEach((k, v) -> detach.accept(v));
composites.forEach((k, v) -> detach.accept(v.composite));
composites.clear();
}

Expand Down Expand Up @@ -291,7 +362,7 @@ private void updateNamespace(
{
configs.forEach(c ->
{
composites.put(c.schemaId, composite);
composites.put(c.schemaId, new CompositeNamespace(composite, c.asyncapi.operations.keySet()));
schemaIdsByApiId.put(c.apiLabel, c.schemaId);
});
asyncapis.forEach(this::extractChannels);
Expand All @@ -314,7 +385,11 @@ private void extractResolveId(
int schemaId,
List<BindingConfig> bindings)
{
bindings.forEach(b -> compositeResolvedIds.put(schemaId, b.id));
bindings.forEach(b ->
{
String operationType = b.type.replace("-kafka", "");
compositeResolvedIds.put(schemaId + operationType, b.id);
});
}

private void extractOperations(
Expand All @@ -324,6 +399,16 @@ private void extractOperations(
{
String[] refParts = v.channel.ref.split("/");
operationIds.put(refParts[refParts.length - 1], k);
if (SEND_OPERATION.equals(v.action))
{
sendOperationIds.put(refParts[refParts.length - 1], k);
}
else if (RECEIVE_OPERATION.equals(v.action))
{
receiveOperationIds.put(refParts[refParts.length - 1], k);
}

operationBindings.put(k, v.bindings);
});
}

Expand Down Expand Up @@ -443,4 +528,18 @@ private void visitAuthority(
authority = authorityRO.wrap(value.buffer(), value.offset(), value.limit());
}
}

static class CompositeNamespace
{
NamespaceConfig composite;
Set<String> operations;

CompositeNamespace(
NamespaceConfig composite,
Set<String> operations)
{
this.composite = composite;
this.operations = operations;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected <C> BindingConfigBuilder<C> injectProxyRoutes(
for (AsyncapiConditionConfig condition : route.when)
{
final Asyncapi httpAsyncapi = asyncapis.get(condition.apiId);
if (httpAsyncapi.servers.values().stream().anyMatch(s -> !s.protocol.startsWith(ASYNCAPI_HTTP_PROTOCOL_NAME)))
if (httpAsyncapi.servers.values().stream().noneMatch(s -> s.protocol.startsWith(ASYNCAPI_HTTP_PROTOCOL_NAME)))
{
break inject;
}
Expand Down Expand Up @@ -115,47 +115,50 @@ private <C> BindingConfigBuilder<C> addHttpKafkaRoute(

final AsyncapiChannelView channel = AsyncapiChannelView.of(httpAsyncapi.channels, whenOperation.channel);
String path = channel.address();
String method = whenOperation.bindings.get("http").method;
final List<String> paramNames = findParams(path);
if (whenOperation.bindings != null)
{
String method = whenOperation.bindings.get("http").method;
final List<String> paramNames = findParams(path);

AsyncapiChannelView httpChannel = AsyncapiChannelView.of(httpAsyncapi.channels, whenOperation.channel);
AsyncapiChannelView httpChannel = AsyncapiChannelView.of(httpAsyncapi.channels, whenOperation.channel);

boolean async = httpChannel.messages().values()
.stream().anyMatch(asyncapiMessage ->
{
AsyncapiMessageView message =
AsyncapiMessageView.of(httpAsyncapi.components.messages, asyncapiMessage);
return message.correlationId() != null;
});
boolean async = httpChannel.messages().values()
.stream().anyMatch(asyncapiMessage ->
{
AsyncapiMessageView message =
AsyncapiMessageView.of(httpAsyncapi.components.messages, asyncapiMessage);
return message.correlationId() != null;
});

if (async)
{
for (AsyncapiOperation operation : httpAsyncapi.operations.values())
if (async)
{
AsyncapiChannelView channelView = AsyncapiChannelView.of(httpAsyncapi.channels, operation.channel);
if (parameters.reset(channelView.address()).find())
for (AsyncapiOperation operation : httpAsyncapi.operations.values())
{
AsyncapiReply reply = withOperation.reply;
if (reply != null)
AsyncapiChannelView channelView = AsyncapiChannelView.of(httpAsyncapi.channels, operation.channel);
if (parameters.reset(channelView.address()).find())
{
final RouteConfigBuilder<BindingConfigBuilder<C>> asyncRouteBuilder = binding.route();
binding = addAsyncOperation(asyncRouteBuilder, httpAsyncapi, kafkaAsyncapi, operation,
withOperation);
AsyncapiReply reply = withOperation.reply;
if (reply != null)
{
final RouteConfigBuilder<BindingConfigBuilder<C>> asyncRouteBuilder = binding.route();
binding = addAsyncOperation(asyncRouteBuilder, httpAsyncapi, kafkaAsyncapi, operation,
withOperation);
}
}
}
}
}

final RouteConfigBuilder<BindingConfigBuilder<C>> routeBuilder = binding.route();
routeBuilder
.exit(qname)
.when(HttpKafkaConditionConfig::builder)
.method(method)
.path(path)
.build()
.inject(r -> injectHttpKafkaRouteWith(r, httpAsyncapi, kafkaAsyncapi, whenOperation,
withOperation, paramNames));
binding = routeBuilder.build();
final RouteConfigBuilder<BindingConfigBuilder<C>> routeBuilder = binding.route();
routeBuilder
.exit(qname)
.when(HttpKafkaConditionConfig::builder)
.method(method)
.path(path)
.build()
.inject(r -> injectHttpKafkaRouteWith(r, httpAsyncapi, kafkaAsyncapi, whenOperation,
withOperation, paramNames));
binding = routeBuilder.build();
}
return binding;
}

Expand Down
Loading

0 comments on commit ef0ee25

Please sign in to comment.