Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support karapace catalog #893

Merged
merged 8 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cloud/docker-image/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>catalog-schema-registry</artifactId>
<artifactId>catalog-karapace</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
Expand Down
2 changes: 1 addition & 1 deletion cloud/docker-image/src/main/docker/zpm.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"io.aklivity.zilla:binding-ws",
"io.aklivity.zilla:catalog-apicurio",
"io.aklivity.zilla:catalog-inline",
"io.aklivity.zilla:catalog-schema-registry",
"io.aklivity.zilla:catalog-karapace",
"io.aklivity.zilla:common",
"io.aklivity.zilla:command",
"io.aklivity.zilla:command-dump",
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

2 changes: 1 addition & 1 deletion incubator/exporter-stdout/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
<artifactId>flyweight-maven-plugin</artifactId>
<version>${project.version}</version>
<configuration>
<scopeNames>core http jwt kafka schema_registry tcp tls</scopeNames>
<scopeNames>core http jwt kafka karapace tcp tls</scopeNames>
<packageName>io.aklivity.zilla.runtime.exporter.stdout.internal.types</packageName>
</configuration>
<executions>
Expand Down
7 changes: 0 additions & 7 deletions incubator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
<module>binding-openapi-asyncapi.spec</module>
<module>catalog-apicurio.spec</module>
<module>catalog-inline.spec</module>
<module>catalog-schema-registry.spec</module>
<module>exporter-otlp.spec</module>
<module>exporter-stdout.spec</module>
<module>model-avro.spec</module>
Expand All @@ -38,7 +37,6 @@

<module>catalog-apicurio</module>
<module>catalog-inline</module>
<module>catalog-schema-registry</module>

<module>command-log</module>
<module>command-dump</module>
Expand Down Expand Up @@ -86,11 +84,6 @@
<artifactId>catalog-inline</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>catalog-schema-registry</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>command-log</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>incubator</artifactId>
<artifactId>runtime</artifactId>
<version>develop-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>catalog-schema-registry</artifactId>
<name>zilla::incubator::catalog-schema-registry</name>
<artifactId>catalog-karapace</artifactId>
<name>zilla::runtime::catalog-karapace</name>

<licenses>
<license>
Expand All @@ -31,7 +31,7 @@
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>catalog-schema-registry.spec</artifactId>
<artifactId>catalog-karapace.spec</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
Expand Down Expand Up @@ -76,8 +76,8 @@
<artifactId>flyweight-maven-plugin</artifactId>
<version>${project.version}</version>
<configuration>
<scopeNames>core schema_registry internal</scopeNames>
<packageName>io.aklivity.zilla.runtime.catalog.schema.registry.internal.types</packageName>
<scopeNames>core karapace internal</scopeNames>
<packageName>io.aklivity.zilla.runtime.catalog.karapace.internal.types</packageName>
</configuration>
<executions>
<execution>
Expand Down Expand Up @@ -106,16 +106,16 @@
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>catalog-schema-registry.spec</artifactId>
<artifactId>catalog-karapace.spec</artifactId>
<fileMappers>
<org.codehaus.plexus.components.io.filemappers.RegExpFileMapper>
<pattern>^\Qio/aklivity/zilla/specs/catalog/schema/registry/\E</pattern>
<replacement>io/aklivity/zilla/runtime/catalog/schema/registry/internal/</replacement>
<pattern>^\Qio/aklivity/zilla/specs/catalog/karapace/\E</pattern>
<replacement>io/aklivity/zilla/runtime/catalog/karapace/internal/</replacement>
</org.codehaus.plexus.components.io.filemappers.RegExpFileMapper>
</fileMappers>
</artifactItem>
</artifactItems>
<includes>io/aklivity/zilla/specs/catalog/schema/registry/schema/schema.registry.schema.patch.json</includes>
<includes>io/aklivity/zilla/specs/catalog/karapace/*/*</includes>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
</configuration>
</execution>
Expand All @@ -142,7 +142,7 @@
<artifactId>jacoco-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>io/aklivity/zilla/runtime/catalog/schema/registry/internal/types/**/*.class</exclude>
<exclude>io/aklivity/zilla/runtime/catalog/karapace/internal/types/**/*.class</exclude>
</excludes>
<rules>
<rule>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.catalog.schema.registry.internal;
package io.aklivity.zilla.runtime.catalog.karapace.internal;

public class CachedSchemaId
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.catalog.schema.registry.internal;
package io.aklivity.zilla.runtime.catalog.karapace.internal;

import java.net.URL;

Expand All @@ -21,31 +21,31 @@
import io.aklivity.zilla.runtime.engine.catalog.Catalog;
import io.aklivity.zilla.runtime.engine.catalog.CatalogContext;

public class SchemaRegistryCatalog implements Catalog
public class KarapaceCatalog implements Catalog
{
public static final String NAME = "schema-registry";
public static final String NAME = "karapace";

public SchemaRegistryCatalog(
public KarapaceCatalog(
Configuration config)
{
}

@Override
public String name()
{
return SchemaRegistryCatalog.NAME;
return KarapaceCatalog.NAME;
}

@Override
public CatalogContext supply(
EngineContext context)
{
return new SchemaRegistryCatalogContext(context);
return new KarapaceCatalogContext(context);
}

@Override
public URL type()
{
return getClass().getResource("schema/schema.registry.schema.patch.json");
return getClass().getResource("schema/karapace.schema.patch.json");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.catalog.schema.registry.internal;
package io.aklivity.zilla.runtime.catalog.karapace.internal;

import io.aklivity.zilla.runtime.catalog.schema.registry.internal.config.SchemaRegistryOptionsConfig;
import io.aklivity.zilla.runtime.catalog.karapace.internal.config.KarapaceOptionsConfig;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.catalog.CatalogContext;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.config.CatalogConfig;

public class SchemaRegistryCatalogContext implements CatalogContext
public class KarapaceCatalogContext implements CatalogContext
{
private final EngineContext context;

public SchemaRegistryCatalogContext(
public KarapaceCatalogContext(
EngineContext context)
{
this.context = context;
Expand All @@ -34,6 +34,6 @@ public SchemaRegistryCatalogContext(
public CatalogHandler attach(
CatalogConfig catalog)
{
return new SchemaRegistryCatalogHandler(SchemaRegistryOptionsConfig.class.cast(catalog.options), context, catalog.id);
return new KarapaceCatalogHandler(KarapaceOptionsConfig.class.cast(catalog.options), context, catalog.id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,24 @@
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.catalog.schema.registry.internal;
package io.aklivity.zilla.runtime.catalog.karapace.internal;

import io.aklivity.zilla.runtime.common.feature.Incubating;
import io.aklivity.zilla.runtime.engine.Configuration;
import io.aklivity.zilla.runtime.engine.catalog.Catalog;
import io.aklivity.zilla.runtime.engine.catalog.CatalogFactorySpi;

@Incubating
public class SchemaRegistryCatalogFactorySpi implements CatalogFactorySpi
public class KarapaceCatalogFactorySpi implements CatalogFactorySpi
{
@Override
public String type()
{
return SchemaRegistryCatalog.NAME;
return KarapaceCatalog.NAME;
}

@Override
public Catalog create(
Configuration config)
{
return new SchemaRegistryCatalog(config);
return new KarapaceCatalog(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.catalog.schema.registry.internal;
package io.aklivity.zilla.runtime.catalog.karapace.internal;

import java.net.URI;
import java.net.http.HttpClient;
Expand All @@ -27,22 +27,22 @@
import org.agrona.collections.Int2ObjectCache;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.catalog.schema.registry.internal.config.SchemaRegistryOptionsConfig;
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.serializer.RegisterSchemaRequest;
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.types.SchemaRegistryPrefixFW;
import io.aklivity.zilla.runtime.catalog.karapace.internal.config.KarapaceOptionsConfig;
import io.aklivity.zilla.runtime.catalog.karapace.internal.serializer.RegisterSchemaRequest;
import io.aklivity.zilla.runtime.catalog.karapace.internal.types.KarapacePrefixFW;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;

public class SchemaRegistryCatalogHandler implements CatalogHandler
public class KarapaceCatalogHandler implements CatalogHandler
{
private static final String SUBJECT_VERSION_PATH = "/subjects/{0}/versions/{1}";
private static final String SCHEMA_PATH = "/schemas/ids/{0}";
private static final String REGISTER_SCHEMA_PATH = "/subjects/{0}/versions";
private static final int MAX_PADDING_LENGTH = 5;
private static final byte MAGIC_BYTE = 0x0;

private final SchemaRegistryPrefixFW.Builder prefixRW = new SchemaRegistryPrefixFW.Builder()
private final KarapacePrefixFW.Builder prefixRW = new KarapacePrefixFW.Builder()
.wrap(new UnsafeBuffer(new byte[5]), 0, 5);

private final HttpClient client;
Expand All @@ -52,11 +52,11 @@ public class SchemaRegistryCatalogHandler implements CatalogHandler
private final Int2ObjectCache<String> schemas;
private final Int2ObjectCache<CachedSchemaId> schemaIds;
private final long maxAgeMillis;
private final SchemaRegistryEventContext event;
private final KarapaceEventContext event;
private final long catalogId;

public SchemaRegistryCatalogHandler(
SchemaRegistryOptionsConfig config,
public KarapaceCatalogHandler(
KarapaceOptionsConfig config,
EngineContext context,
long catalogId)
{
Expand All @@ -67,7 +67,7 @@ public SchemaRegistryCatalogHandler(
this.schemas = new Int2ObjectCache<>(1, 1024, i -> {});
this.schemaIds = new Int2ObjectCache<>(1, 1024, i -> {});
this.maxAgeMillis = config.maxAge.toMillis();
this.event = new SchemaRegistryEventContext(context);
this.event = new KarapaceEventContext(context);
this.catalogId = catalogId;
}

Expand Down Expand Up @@ -169,7 +169,7 @@ public int encode(
ValueConsumer next,
Encoder encoder)
{
SchemaRegistryPrefixFW prefix = prefixRW.rewrap().schemaId(schemaId).build();
KarapacePrefixFW prefix = prefixRW.rewrap().schemaId(schemaId).build();
next.accept(prefix.buffer(), prefix.offset(), prefix.sizeof());
int valLength = encoder.accept(traceId, bindingId, schemaId, data, index, length, next);
return valLength > 0 ? prefix.sizeof() + valLength : -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.catalog.schema.registry.internal;
package io.aklivity.zilla.runtime.catalog.karapace.internal;

import static io.aklivity.zilla.runtime.catalog.schema.registry.internal.types.event.SchemaRegistryEventType.REMOTE_ACCESS_REJECTED;

import static io.aklivity.zilla.runtime.catalog.karapace.internal.types.event.KarapaceEventType.REMOTE_ACCESS_REJECTED;

import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
Expand All @@ -23,29 +24,29 @@
import org.agrona.concurrent.AtomicBuffer;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.catalog.schema.registry.internal.types.event.EventFW;
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.types.event.SchemaRegistryEventExFW;
import io.aklivity.zilla.runtime.catalog.karapace.internal.types.event.EventFW;
import io.aklivity.zilla.runtime.catalog.karapace.internal.types.event.KarapaceEventExFW;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer;

public class SchemaRegistryEventContext
public class KarapaceEventContext
{
private static final int EVENT_BUFFER_CAPACITY = 1024;

private final AtomicBuffer eventBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY));
private final AtomicBuffer extensionBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY));
private final EventFW.Builder eventRW = new EventFW.Builder();
private final SchemaRegistryEventExFW.Builder schemaRegistryEventExRW = new SchemaRegistryEventExFW.Builder();
private final int schemaRegistryTypeId;
private final KarapaceEventExFW.Builder karapaceEventExRW = new KarapaceEventExFW.Builder();
private final int karapaceTypeId;
private final int remoteAccessRejectedEventId;
private final MessageConsumer eventWriter;
private final Clock clock;

public SchemaRegistryEventContext(
public KarapaceEventContext(
EngineContext context)
{
this.schemaRegistryTypeId = context.supplyTypeId(SchemaRegistryCatalog.NAME);
this.remoteAccessRejectedEventId = context.supplyEventId("catalog.schema.registry.remote.access.rejected");
this.karapaceTypeId = context.supplyTypeId(KarapaceCatalog.NAME);
this.remoteAccessRejectedEventId = context.supplyEventId("catalog.karapace.remote.access.rejected");
this.eventWriter = context.supplyEventWriter();
this.clock = context.clock();
}
Expand All @@ -55,7 +56,7 @@ public void remoteAccessRejected(
HttpRequest httpRequest,
int status)
{
SchemaRegistryEventExFW extension = schemaRegistryEventExRW
KarapaceEventExFW extension = karapaceEventExRW
.wrap(extensionBuffer, 0, extensionBuffer.capacity())
.remoteAccessRejected(e -> e
.typeId(REMOTE_ACCESS_REJECTED.value())
Expand All @@ -72,6 +73,6 @@ public void remoteAccessRejected(
.namespacedId(catalogId)
.extension(extension.buffer(), extension.offset(), extension.limit())
.build();
eventWriter.accept(schemaRegistryTypeId, event.buffer(), event.offset(), event.limit());
eventWriter.accept(karapaceTypeId, event.buffer(), event.offset(), event.limit());
}
}
Loading