Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asyncapi sse kafka proxy #1099

Merged
merged 4 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions runtime/binding-asyncapi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-sse-kafka</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-tls</artifactId>
Expand Down Expand Up @@ -174,6 +180,13 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.aklivity.zilla</groupId>
<artifactId>binding-sse-kafka</artifactId>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.function.LongFunction;
Expand All @@ -44,10 +45,13 @@
import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiParser;
import io.aklivity.zilla.runtime.binding.asyncapi.config.AsyncapiSchemaConfig;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.Asyncapi;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.model.AsyncapiBinding;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.HttpHeaderFW;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.String8FW;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.stream.HttpBeginExFW;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.stream.MqttBeginExFW;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.types.stream.SseBeginExFW;
import io.aklivity.zilla.runtime.binding.asyncapi.internal.view.AsyncapiServerView;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.config.BindingConfig;
Expand All @@ -57,20 +61,26 @@

public final class AsyncapiBindingConfig
{
public static final String SEND_OPERATION = "send";
public static final String RECEIVE_OPERATION = "receive";
public final long id;
public final String name;
public final KindConfig kind;
public final AsyncapiOptionsConfig options;
public final List<AsyncapiRouteConfig> routes;

private final Int2ObjectHashMap<String> typesByNamespaceId;
private final Int2ObjectHashMap<NamespaceConfig> composites;
private final Int2ObjectHashMap<CompositeNamespace> composites;
private final Long2LongHashMap apiIdsByNamespaceId;
private final AsyncapiNamespaceGenerator namespaceGenerator;
private final Long2LongHashMap compositeResolvedIds;
private final Object2LongHashMap<String> compositeResolvedIds;
private final Object2ObjectHashMap<Matcher, String> paths;
private final Object2ObjectHashMap<Matcher, String> topics;
private final Object2LongHashMap<String> schemaIdsByApiId;
private final Map<CharSequence, String> operationIds;
private final Map<String, Map<String, AsyncapiBinding>> operationBindings;
private final Map<CharSequence, String> receiveOperationIds;
private final Map<CharSequence, String> sendOperationIds;
private final LongFunction<CatalogHandler> supplyCatalog;
private final ToLongFunction<String> resolveId;
private final Consumer<NamespaceConfig> attach;
Expand All @@ -97,11 +107,15 @@ public AsyncapiBindingConfig(
this.options = (AsyncapiOptionsConfig) binding.options;
this.composites = new Int2ObjectHashMap<>();
this.apiIdsByNamespaceId = new Long2LongHashMap(-1);
this.compositeResolvedIds = new Long2LongHashMap(-1);
this.compositeResolvedIds = new Object2LongHashMap<>(-1);
this.schemaIdsByApiId = new Object2LongHashMap<>(-1);
this.typesByNamespaceId = new Int2ObjectHashMap<>();
this.paths = new Object2ObjectHashMap<>();
this.topics = new Object2ObjectHashMap<>();
this.operationIds = new TreeMap<>(CharSequence::compare);
this.operationBindings = new HashMap<>();
this.receiveOperationIds = new TreeMap<>(CharSequence::compare);
this.sendOperationIds = new TreeMap<>(CharSequence::compare);
this.helper = new HttpHeaderHelper();
this.parser = new AsyncapiParser();
this.attach = attachComposite;
Expand All @@ -115,16 +129,11 @@ public boolean isCompositeOriginId(
return typesByNamespaceId.containsKey(NamespacedId.namespaceId(originId));
}

public String getCompositeOriginType(
long originId)
{
return typesByNamespaceId.get(NamespacedId.namespaceId(originId));
}

public long resolveCompositeResolvedId(
long apiId)
long apiId,
String type)
{
return overrideRouteId != -1 ? overrideRouteId : compositeResolvedIds.get(apiId);
return overrideRouteId != -1 ? overrideRouteId : compositeResolvedIds.get(apiId + type);
}

public long resolveApiId(
Expand All @@ -139,7 +148,48 @@ public long resolveApiId(
return schemaIdsByApiId.get(apiId);
}

public String resolveOperationId(
public String resolveMqttOperationId(
MqttBeginExFW mqttBeginEx)
{
String topic;
String operationId = null;

switch (mqttBeginEx.kind())
{
case MqttBeginExFW.KIND_PUBLISH:
topic = mqttBeginEx.publish().topic().asString();
for (Map.Entry<Matcher, String> item : paths.entrySet())
{
Matcher matcher = item.getKey();
matcher.reset(topic);
if (matcher.find())
{
String channelName = item.getValue();
operationId = sendOperationIds.get(channelName);
break;
}
}
break;
case MqttBeginExFW.KIND_SUBSCRIBE:
topic = mqttBeginEx.subscribe().filters().matchFirst(x -> true).pattern().asString();
for (Map.Entry<Matcher, String> item : paths.entrySet())
{
Matcher matcher = item.getKey();
matcher.reset(topic);
if (matcher.find())
{
String channelName = item.getValue();
operationId = receiveOperationIds.get(channelName);
break;
}
}
break;
}

return operationId;
}

public String resolveHttpOperationId(
HttpBeginExFW httpBeginEx)
{
helper.visit(httpBeginEx);
Expand All @@ -161,6 +211,26 @@ public String resolveOperationId(
return operationId;
}

public String resolveSseOperationId(
SseBeginExFW sseBeginEx)
{
String operationId = null;

for (Map.Entry<Matcher, String> item : paths.entrySet())
{
Matcher matcher = item.getKey();
matcher.reset(sseBeginEx.path().asString());
if (matcher.find())
{
String channelName = item.getValue();
operationId = operationIds.get(channelName);
break;
}
}

return operationId;
}

public AsyncapiRouteConfig resolve(
long authorization)
{
Expand Down Expand Up @@ -194,33 +264,34 @@ public void attach(
attachServerClientBinding(binding, configs);
}

for (Map.Entry<Integer, NamespaceConfig> entry : composites.entrySet())
for (Map.Entry<Integer, CompositeNamespace> entry : composites.entrySet())
{
Integer k = entry.getKey();
NamespaceConfig v = entry.getValue();
CompositeNamespace v = entry.getValue();
NamespaceConfig namespaceConfig = v.composite;
List<BindingConfig> bindings;
boolean containsSse = v.bindings.stream().anyMatch(b -> b.type.equals("sse"));
boolean containsSse = namespaceConfig.bindings.stream().anyMatch(b -> b.type.equals("sse"));
if (containsSse)
{
if (binding.kind.equals(SERVER))
{
bindings = v.bindings.stream()
bindings = namespaceConfig.bindings.stream()
.filter(b -> b.type.equals("http") || b.type.equals("http-kafka") || b.type.equals("sse"))
.collect(toList());
}
else
{
bindings = v.bindings.stream()
bindings = namespaceConfig.bindings.stream()
.filter(b -> b.type.equals("sse"))
.collect(toList());
}
}
else
{
bindings = v.bindings.stream()
bindings = namespaceConfig.bindings.stream()
.filter(b -> b.type.equals("mqtt") || b.type.equals("http") || b.type.equals("sse") ||
b.type.equals("kafka") && b.kind == CACHE_CLIENT || b.type.equals("mqtt-kafka") ||
b.type.equals("http-kafka"))
b.type.equals("http-kafka") || b.type.equals("sse-kafka"))
.collect(toList());
}

Expand All @@ -231,7 +302,7 @@ public void attach(

public void detach()
{
composites.forEach((k, v) -> detach.accept(v));
composites.forEach((k, v) -> detach.accept(v.composite));
composites.clear();
}

Expand Down Expand Up @@ -291,7 +362,7 @@ private void updateNamespace(
{
configs.forEach(c ->
{
composites.put(c.schemaId, composite);
composites.put(c.schemaId, new CompositeNamespace(composite, c.asyncapi.operations.keySet()));
schemaIdsByApiId.put(c.apiLabel, c.schemaId);
});
asyncapis.forEach(this::extractChannels);
Expand All @@ -314,7 +385,11 @@ private void extractResolveId(
int schemaId,
List<BindingConfig> bindings)
{
bindings.forEach(b -> compositeResolvedIds.put(schemaId, b.id));
bindings.forEach(b ->
{
String operationType = b.type.replace("-kafka", "");
compositeResolvedIds.put(schemaId + operationType, b.id);
});
}

private void extractOperations(
Expand All @@ -324,6 +399,16 @@ private void extractOperations(
{
String[] refParts = v.channel.ref.split("/");
operationIds.put(refParts[refParts.length - 1], k);
if (SEND_OPERATION.equals(v.action))
{
sendOperationIds.put(refParts[refParts.length - 1], k);
}
else if (RECEIVE_OPERATION.equals(v.action))
{
receiveOperationIds.put(refParts[refParts.length - 1], k);
}

operationBindings.put(k, v.bindings);
});
}

Expand Down Expand Up @@ -443,4 +528,18 @@ private void visitAuthority(
authority = authorityRO.wrap(value.buffer(), value.offset(), value.limit());
}
}

static class CompositeNamespace
{
NamespaceConfig composite;
Set<String> operations;

CompositeNamespace(
NamespaceConfig composite,
Set<String> operations)
{
this.composite = composite;
this.operations = operations;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected <C> BindingConfigBuilder<C> injectProxyRoutes(
for (AsyncapiConditionConfig condition : route.when)
{
final Asyncapi httpAsyncapi = asyncapis.get(condition.apiId);
if (httpAsyncapi.servers.values().stream().anyMatch(s -> !s.protocol.startsWith(ASYNCAPI_HTTP_PROTOCOL_NAME)))
if (httpAsyncapi.servers.values().stream().noneMatch(s -> s.protocol.startsWith(ASYNCAPI_HTTP_PROTOCOL_NAME)))
{
break inject;
}
Expand Down Expand Up @@ -115,47 +115,50 @@ private <C> BindingConfigBuilder<C> addHttpKafkaRoute(

final AsyncapiChannelView channel = AsyncapiChannelView.of(httpAsyncapi.channels, whenOperation.channel);
String path = channel.address();
String method = whenOperation.bindings.get("http").method;
final List<String> paramNames = findParams(path);
if (whenOperation.bindings != null)
{
String method = whenOperation.bindings.get("http").method;
final List<String> paramNames = findParams(path);

AsyncapiChannelView httpChannel = AsyncapiChannelView.of(httpAsyncapi.channels, whenOperation.channel);
AsyncapiChannelView httpChannel = AsyncapiChannelView.of(httpAsyncapi.channels, whenOperation.channel);

boolean async = httpChannel.messages().values()
.stream().anyMatch(asyncapiMessage ->
{
AsyncapiMessageView message =
AsyncapiMessageView.of(httpAsyncapi.components.messages, asyncapiMessage);
return message.correlationId() != null;
});
boolean async = httpChannel.messages().values()
.stream().anyMatch(asyncapiMessage ->
{
AsyncapiMessageView message =
AsyncapiMessageView.of(httpAsyncapi.components.messages, asyncapiMessage);
return message.correlationId() != null;
});

if (async)
{
for (AsyncapiOperation operation : httpAsyncapi.operations.values())
if (async)
{
AsyncapiChannelView channelView = AsyncapiChannelView.of(httpAsyncapi.channels, operation.channel);
if (parameters.reset(channelView.address()).find())
for (AsyncapiOperation operation : httpAsyncapi.operations.values())
{
AsyncapiReply reply = withOperation.reply;
if (reply != null)
AsyncapiChannelView channelView = AsyncapiChannelView.of(httpAsyncapi.channels, operation.channel);
if (parameters.reset(channelView.address()).find())
{
final RouteConfigBuilder<BindingConfigBuilder<C>> asyncRouteBuilder = binding.route();
binding = addAsyncOperation(asyncRouteBuilder, httpAsyncapi, kafkaAsyncapi, operation,
withOperation);
AsyncapiReply reply = withOperation.reply;
if (reply != null)
{
final RouteConfigBuilder<BindingConfigBuilder<C>> asyncRouteBuilder = binding.route();
binding = addAsyncOperation(asyncRouteBuilder, httpAsyncapi, kafkaAsyncapi, operation,
withOperation);
}
}
}
}
}

final RouteConfigBuilder<BindingConfigBuilder<C>> routeBuilder = binding.route();
routeBuilder
.exit(qname)
.when(HttpKafkaConditionConfig::builder)
.method(method)
.path(path)
.build()
.inject(r -> injectHttpKafkaRouteWith(r, httpAsyncapi, kafkaAsyncapi, whenOperation,
withOperation, paramNames));
binding = routeBuilder.build();
final RouteConfigBuilder<BindingConfigBuilder<C>> routeBuilder = binding.route();
routeBuilder
.exit(qname)
.when(HttpKafkaConditionConfig::builder)
.method(method)
.path(path)
.build()
.inject(r -> injectHttpKafkaRouteWith(r, httpAsyncapi, kafkaAsyncapi, whenOperation,
withOperation, paramNames));
binding = routeBuilder.build();
}
return binding;
}

Expand Down
Loading