Skip to content

Commit

Permalink
Support local logging of events (#755)
Browse files Browse the repository at this point in the history
  • Loading branch information
attilakreiner authored Feb 28, 2024
1 parent d0a8f21 commit 77be54a
Show file tree
Hide file tree
Showing 121 changed files with 5,832 additions and 347 deletions.
6 changes: 6 additions & 0 deletions cloud/docker-image/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>exporter-stdout</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>metrics-stream</artifactId>
Expand Down
3 changes: 2 additions & 1 deletion cloud/docker-image/src/main/docker/zpm.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@
"io.aklivity.zilla:command-stop",
"io.aklivity.zilla:command-tune",
"io.aklivity.zilla:engine",
"io.aklivity.zilla:exporter-prometheus",
"io.aklivity.zilla:exporter-otlp",
"io.aklivity.zilla:exporter-prometheus",
"io.aklivity.zilla:exporter-stdout",
"io.aklivity.zilla:guard-jwt",
"io.aklivity.zilla:metrics-stream",
"io.aklivity.zilla:metrics-http",
Expand Down
20 changes: 20 additions & 0 deletions incubator/catalog-schema-registry.spec/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,23 @@
<groupId>org.jasig.maven</groupId>
<artifactId>maven-notice-plugin</artifactId>
</plugin>
<plugin>
<groupId>${project.groupId}</groupId>
<artifactId>flyweight-maven-plugin</artifactId>
<version>${project.version}</version>
<configuration>
<scopeNames>core schema_registry</scopeNames>
<packageName>io.aklivity.zilla.specs.catalog.schema.registry.internal.types</packageName>
</configuration>
<executions>
<execution>
<goals>
<goal>validate</goal>
<goal>generate</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.mycila</groupId>
<artifactId>license-maven-plugin</artifactId>
Expand All @@ -86,6 +103,9 @@
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>io/aklivity/zilla/specs/catalog/schema/registry/internal/types/**/*.class</exclude>
</excludes>
<rules>
<rule>
<element>BUNDLE</element>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
scope schema_registry
{
scope event
{
enum SchemaRegistryEventType (uint8)
{
REMOTE_ACCESS_REJECTED (1)
}

struct SchemaRegistryRemoteAccessRejected extends core::event::Event
{
string8 method;
string16 url;
int16 status;
}

union SchemaRegistryEvent switch (SchemaRegistryEventType)
{
case REMOTE_ACCESS_REJECTED: SchemaRegistryRemoteAccessRejected remoteAccessRejected;
}
}
}
12 changes: 6 additions & 6 deletions incubator/catalog-schema-registry/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<jacoco.coverage.ratio>0.90</jacoco.coverage.ratio>
<jacoco.coverage.ratio>0.80</jacoco.coverage.ratio>
<jacoco.missed.count>0</jacoco.missed.count>
</properties>

Expand Down Expand Up @@ -71,16 +71,12 @@
<groupId>org.jasig.maven</groupId>
<artifactId>maven-notice-plugin</artifactId>
</plugin>
<plugin>
<groupId>com.mycila</groupId>
<artifactId>license-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>${project.groupId}</groupId>
<artifactId>flyweight-maven-plugin</artifactId>
<version>${project.version}</version>
<configuration>
<scopeNames>internal</scopeNames>
<scopeNames>core schema_registry internal</scopeNames>
<packageName>io.aklivity.zilla.runtime.catalog.schema.registry.internal.types</packageName>
</configuration>
<executions>
Expand All @@ -91,6 +87,10 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.mycila</groupId>
<artifactId>license-maven-plugin</artifactId>
</plugin>
<plugin>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public String name()
public CatalogContext supply(
EngineContext context)
{
return new SchemaRegistryCatalogContext();
return new SchemaRegistryCatalogContext(context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,25 @@
package io.aklivity.zilla.runtime.catalog.schema.registry.internal;

import io.aklivity.zilla.runtime.catalog.schema.registry.internal.config.SchemaRegistryOptionsConfig;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.catalog.CatalogContext;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.config.CatalogConfig;

public class SchemaRegistryCatalogContext implements CatalogContext
{
private final EngineContext context;

public SchemaRegistryCatalogContext(
EngineContext context)
{
this.context = context;
}

@Override
public CatalogHandler attach(
CatalogConfig catalog)
{
return new SchemaRegistryCatalogHandler(SchemaRegistryOptionsConfig.class.cast(catalog.options));
return new SchemaRegistryCatalogHandler(SchemaRegistryOptionsConfig.class.cast(catalog.options), context, catalog.id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.config.SchemaRegistryOptionsConfig;
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.serializer.RegisterSchemaRequest;
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.types.SchemaRegistryPrefixFW;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;

Expand All @@ -51,9 +52,13 @@ public class SchemaRegistryCatalogHandler implements CatalogHandler
private final Int2ObjectCache<String> schemas;
private final Int2ObjectCache<CachedSchemaId> schemaIds;
private final long maxAgeMillis;
private final SchemaRegistryEventContext event;
private final long catalogId;

public SchemaRegistryCatalogHandler(
SchemaRegistryOptionsConfig config)
SchemaRegistryOptionsConfig config,
EngineContext context,
long catalogId)
{
this.baseUrl = config.url;
this.client = HttpClient.newHttpClient();
Expand All @@ -62,6 +67,8 @@ public SchemaRegistryCatalogHandler(
this.schemas = new Int2ObjectCache<>(1, 1024, i -> {});
this.schemaIds = new Int2ObjectCache<>(1, 1024, i -> {});
this.maxAgeMillis = config.maxAge.toMillis();
this.event = new SchemaRegistryEventContext(context);
this.catalogId = catalogId;
}

@Override
Expand Down Expand Up @@ -209,12 +216,18 @@ private String sendHttpRequest(

try
{
HttpResponse<String> response = client.send(httpRequest, HttpResponse.BodyHandlers.ofString());
return response.statusCode() == 200 ? response.body() : null;
HttpResponse<String> httpResponse = client.send(httpRequest, HttpResponse.BodyHandlers.ofString());
boolean success = httpResponse.statusCode() == 200;
String responseBody = success ? httpResponse.body() : null;
if (!success)
{
event.remoteAccessRejected(catalogId, httpRequest, httpResponse.statusCode());
}
return responseBody;
}
catch (Exception ex)
{
ex.printStackTrace(System.out);
event.remoteAccessRejected(catalogId, httpRequest, 0);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.catalog.schema.registry.internal;

import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
import java.time.Clock;

import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.catalog.schema.registry.internal.types.event.SchemaRegistryEventFW;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer;

public class SchemaRegistryEventContext
{
private static final int EVENT_BUFFER_CAPACITY = 1024;

private final SchemaRegistryEventFW.Builder schemaRegistryEventRW = new SchemaRegistryEventFW.Builder();
private final MutableDirectBuffer eventBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY));
private final int schemaRegistryTypeId;
private final MessageConsumer eventWriter;
private final Clock clock;

public SchemaRegistryEventContext(
EngineContext context)
{
this.schemaRegistryTypeId = context.supplyTypeId(SchemaRegistryCatalog.NAME);
this.eventWriter = context.supplyEventWriter();
this.clock = context.clock();
}

public void remoteAccessRejected(
long catalogId,
HttpRequest httpRequest,
int status)
{
SchemaRegistryEventFW event = schemaRegistryEventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.remoteAccessRejected(e -> e
.timestamp(clock.millis())
.traceId(0L)
.namespacedId(catalogId)
.method(httpRequest.method())
.url(httpRequest.uri().toString())
.status((short) status)
)
.build();
eventWriter.accept(schemaRegistryTypeId, event.buffer(), event.offset(), event.limit());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.rules.RuleChain.outerRule;
import static org.mockito.Mockito.mock;

import java.time.Duration;

Expand All @@ -35,6 +36,7 @@
import org.kaazing.k3po.junit.rules.K3poRule;

import io.aklivity.zilla.runtime.catalog.schema.registry.internal.config.SchemaRegistryOptionsConfig;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;

Expand All @@ -49,6 +51,7 @@ public class SchemaRegistryIT
public final TestRule chain = outerRule(k3po).around(timeout);

private SchemaRegistryOptionsConfig config;
private EngineContext context = mock(EngineContext.class);

@Before
public void setup()
Expand All @@ -69,7 +72,7 @@ public void shouldResolveSchemaViaSchemaId() throws Exception
"{\"name\":\"status\",\"type\":\"string\"}]," +
"\"name\":\"Event\",\"namespace\":\"io.aklivity.example\",\"type\":\"record\"}";

SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config);
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config, context, 0L);

String schema = catalog.resolve(9);

Expand All @@ -88,7 +91,7 @@ public void shouldResolveSchemaViaSubjectVersion() throws Exception
"{\"name\":\"status\",\"type\":\"string\"}]," +
"\"name\":\"Event\",\"namespace\":\"io.aklivity.example\",\"type\":\"record\"}";

SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config);
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config, context, 0L);

int schemaId = catalog.resolve("items-snapshots-value", "latest");

Expand All @@ -109,7 +112,7 @@ public void shouldRegisterSchema() throws Exception
String schema = "{\"type\": \"record\",\"name\": \"test\",\"fields\":[{\"type\": \"string\",\"name\": \"field1\"}," +
"{\"type\": \"com.acme.Referenced\",\"name\": \"int\"}]}";

SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config);
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config, context, 0L);

int schemaId = catalog.register("items-snapshots-value", "avro", schema);

Expand All @@ -128,7 +131,7 @@ public void shouldResolveSchemaViaSchemaIdFromCache() throws Exception
"{\"name\":\"status\",\"type\":\"string\"}]," +
"\"name\":\"Event\",\"namespace\":\"io.aklivity.example\",\"type\":\"record\"}";

SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config);
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config, context, 0L);

catalog.resolve(9);

Expand All @@ -149,7 +152,7 @@ public void shouldResolveSchemaViaSubjectVersionFromCache() throws Exception
"{\"name\":\"status\",\"type\":\"string\"}]," +
"\"name\":\"Event\",\"namespace\":\"io.aklivity.example\",\"type\":\"record\"}";

SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config);
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config, context, 0L);

catalog.resolve(catalog.resolve("items-snapshots-value", "latest"));

Expand All @@ -167,15 +170,15 @@ public void shouldResolveSchemaViaSubjectVersionFromCache() throws Exception
@Test
public void shouldVerifyMaxPadding()
{
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config);
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config, context, 0L);

assertEquals(5, catalog.encodePadding());
}

@Test
public void shouldVerifyEncodedData()
{
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config);
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config, context, 0L);

DirectBuffer data = new UnsafeBuffer();

Expand All @@ -191,7 +194,7 @@ public void shouldVerifyEncodedData()
public void shouldResolveSchemaIdAndProcessData()
{

SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config);
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config, context, 0L);

DirectBuffer data = new UnsafeBuffer();

Expand All @@ -207,7 +210,7 @@ public void shouldResolveSchemaIdAndProcessData()
@Test
public void shouldResolveSchemaIdFromData()
{
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config);
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config, context, 0L);

DirectBuffer data = new UnsafeBuffer();

Expand Down
Loading

0 comments on commit 77be54a

Please sign in to comment.