Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support local logging of events #755

Merged
merged 169 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
169 commits
Select commit Hold shift + click to select a range
c7f4627
POC
attilakreiner Jan 22, 2024
1a7592f
fix
attilakreiner Jan 23, 2024
3609baa
WIP HttpEventContext
attilakreiner Jan 23, 2024
d4a6ec6
WIP core Event
attilakreiner Jan 23, 2024
ddef06c
WIP EngineWorker logEvent
attilakreiner Jan 23, 2024
a4399ca
WIP code BindingEvent
attilakreiner Jan 23, 2024
66202a9
WIP h2
attilakreiner Jan 23, 2024
d67cf9e
WIP identity
attilakreiner Jan 24, 2024
95aefb3
WIP mqtt
attilakreiner Jan 24, 2024
e137363
fix
attilakreiner Jan 24, 2024
7cd659f
fix
attilakreiner Jan 24, 2024
3c40746
WIP MessageConsumer
attilakreiner Jan 24, 2024
e58cd98
WIP SchemaRegistryEventContext
attilakreiner Jan 25, 2024
e6a4c09
WIP TcpEventContext
attilakreiner Jan 25, 2024
2916bed
WIP tls
attilakreiner Jan 26, 2024
35a723c
WIP kafka
attilakreiner Jan 26, 2024
d304db7
WIP kafka apiversion
attilakreiner Jan 26, 2024
5d3d78b
mv schmea_registry.idl
attilakreiner Jan 29, 2024
4b86a93
WIP fix rm Result
attilakreiner Jan 29, 2024
d18a716
ref lvl
attilakreiner Jan 29, 2024
66ea78f
Add catalogId
attilakreiner Jan 29, 2024
10663a8
fix rm initialId
attilakreiner Jan 29, 2024
adba065
fix 1
attilakreiner Jan 30, 2024
5504201
fix 2
attilakreiner Jan 30, 2024
bd145f6
fix 3
attilakreiner Jan 30, 2024
2162630
fix 4
attilakreiner Jan 30, 2024
1b9d779
fix 5
attilakreiner Jan 30, 2024
f2e867b
fix 6
attilakreiner Jan 30, 2024
6af5130
fix 7
attilakreiner Jan 30, 2024
4efa893
WIP kafka api version
attilakreiner Jan 31, 2024
a8e6e8c
Merge branch 'develop' into log
attilakreiner Feb 5, 2024
3be3ed3
fix
attilakreiner Feb 5, 2024
377ca3c
ref proxy.idl to tls.idl
attilakreiner Feb 5, 2024
45d33e4
ref proxy.idl to tcp.idl
attilakreiner Feb 5, 2024
ded8235
fix
attilakreiner Feb 5, 2024
431300d
fix
attilakreiner Feb 5, 2024
33d1745
WIP EventsLayout
attilakreiner Feb 5, 2024
f8e4ced
WIP timestamp
attilakreiner Feb 5, 2024
a8d6b19
WIP StdoutExporter
attilakreiner Feb 6, 2024
a675aac
WIP EventsLayout
attilakreiner Feb 7, 2024
da36031
WIP spy
attilakreiner Feb 7, 2024
907aacc
WIP PrintableEventsStream
attilakreiner Feb 8, 2024
b972881
fix
attilakreiner Feb 8, 2024
5083758
fix
attilakreiner Feb 8, 2024
168cad4
fix
attilakreiner Feb 8, 2024
54447ad
fix capacity
attilakreiner Feb 8, 2024
a04f78f
WIP EventsLayoutReader rotate
attilakreiner Feb 8, 2024
056a1a8
fix StdoutEventsStream
attilakreiner Feb 9, 2024
9ff65ea
fix schema
attilakreiner Feb 9, 2024
c3b8844
fix typeId
attilakreiner Feb 9, 2024
ada8412
fix logger
attilakreiner Feb 9, 2024
c0f74bb
fix authorization
attilakreiner Feb 9, 2024
62ed116
fix namespacedId
attilakreiner Feb 9, 2024
cd263c6
fix supply*
attilakreiner Feb 9, 2024
f8c0174
fix logger
attilakreiner Feb 9, 2024
e33e09b
fix
attilakreiner Feb 9, 2024
7a8e33e
fix
attilakreiner Feb 9, 2024
c712d7b
fix lookupLabelId
attilakreiner Feb 9, 2024
2c0ea9f
fix
attilakreiner Feb 9, 2024
be4fa06
WIP ref
attilakreiner Feb 9, 2024
786d434
WIP spies
attilakreiner Feb 9, 2024
d0f8585
WIP spies
attilakreiner Feb 9, 2024
c5a482c
WIP spies
attilakreiner Feb 12, 2024
1c31152
WIP spies
attilakreiner Feb 12, 2024
b934025
WIP StdoutEventsStream
attilakreiner Feb 12, 2024
5fe3ab4
Merge branch 'develop' into log
attilakreiner Feb 12, 2024
be6486b
add EventsLayoutTest
attilakreiner Feb 12, 2024
8824343
fix pom
attilakreiner Feb 12, 2024
bfa4aca
add StdoutExporterHandlerTest
attilakreiner Feb 12, 2024
62f976e
fix
attilakreiner Feb 12, 2024
624e7ef
fix
attilakreiner Feb 13, 2024
3f9b66c
fix
attilakreiner Feb 13, 2024
8f39a77
fix onDecodeResponseErrorCode
attilakreiner Feb 13, 2024
0f57f30
rm accessDeinied
attilakreiner Feb 13, 2024
393b00b
fix internal
attilakreiner Feb 13, 2024
705ad99
cleanup
attilakreiner Feb 13, 2024
8d63a5d
fix EventReader
attilakreiner Feb 13, 2024
b7dfcd9
cleanup
attilakreiner Feb 13, 2024
7ef306d
fix EventHandler
attilakreiner Feb 13, 2024
4a5a7f6
fix
attilakreiner Feb 13, 2024
633ed14
fix tcp
attilakreiner Feb 13, 2024
1289396
fix
attilakreiner Feb 13, 2024
c47719f
fix
attilakreiner Feb 14, 2024
ed04985
WIP RingBufferSpy
attilakreiner Feb 15, 2024
db2cf89
Revert "WIP RingBufferSpy"
attilakreiner Feb 16, 2024
6b74346
WIP eventReadingQueue
attilakreiner Feb 15, 2024
29d06f3
WIP sortEventIndicesByTimestamps
attilakreiner Feb 16, 2024
5740ead
test stdout
attilakreiner Feb 16, 2024
0acc7f4
fix Engine readEvent
attilakreiner Feb 19, 2024
baf1c19
WIP http server log
attilakreiner Feb 19, 2024
39e737d
rm level
attilakreiner Feb 20, 2024
da6cb7b
fix onDecodeResponseErrorCode
attilakreiner Feb 20, 2024
05db0f4
fix tcp remoteAddress
attilakreiner Feb 20, 2024
cf554b6
fix tcp traceId doNetConnect
attilakreiner Feb 20, 2024
776c2ec
fix tcp traceId 0 onNetConnect
attilakreiner Feb 20, 2024
10cfdfc
fix mv MessageReader
attilakreiner Feb 20, 2024
f302779
fix Engine supplyEventReader
attilakreiner Feb 20, 2024
035589e
fix mv lookupTypeId
attilakreiner Feb 20, 2024
ceb2efc
fix mv remoteaddressrejected
attilakreiner Feb 20, 2024
d2665ac
fix mv supplyEventWriter
attilakreiner Feb 20, 2024
7415d67
fix mv tcp.idl
attilakreiner Feb 20, 2024
68e1b0c
fix mv http.idl
attilakreiner Feb 20, 2024
e888bcf
fix mv kafka.idl
attilakreiner Feb 20, 2024
730ecaf
fix mv mqtt.idl
attilakreiner Feb 20, 2024
4c4b0e6
fix mv schema_registry.idl
attilakreiner Feb 20, 2024
f359b76
fix mv tls.idl
attilakreiner Feb 20, 2024
154f0ef
fix Clock
attilakreiner Feb 20, 2024
31f69a8
fix EventReader inner class
attilakreiner Feb 20, 2024
853f794
fix timestamp
attilakreiner Feb 20, 2024
4f9009d
fix kafka
attilakreiner Feb 21, 2024
68d6ffd
fix tcp
attilakreiner Feb 21, 2024
856d40d
fix engine
attilakreiner Feb 21, 2024
e2a4edf
fix stdout exp
attilakreiner Feb 21, 2024
6559700
fix tcp
attilakreiner Feb 21, 2024
7261233
fix supplyTypeId
attilakreiner Feb 21, 2024
fa8db9b
fix tls
attilakreiner Feb 21, 2024
df71e7f
fix rm http response
attilakreiner Feb 21, 2024
d766096
fix http auth failure
attilakreiner Feb 21, 2024
de6cdaf
fix kafka auth failure
attilakreiner Feb 21, 2024
c57b3d2
fix mqtt auth failure
attilakreiner Feb 21, 2024
d681727
fix http log format
attilakreiner Feb 21, 2024
cbb6316
fix kafka log format
attilakreiner Feb 21, 2024
11790f1
fix mqtt log format
attilakreiner Feb 21, 2024
c802e2c
fix tcp log format
attilakreiner Feb 21, 2024
41a8d3b
fix tls log format
attilakreiner Feb 21, 2024
cc4987c
fix schreg log format
attilakreiner Feb 21, 2024
71a6818
fix revert schreg
attilakreiner Feb 22, 2024
02f3b7b
fix identity
attilakreiner Feb 22, 2024
0532c2c
fix rm traceId
attilakreiner Feb 22, 2024
4736c14
fix rm event type name
attilakreiner Feb 22, 2024
af555ce
fix http
attilakreiner Feb 22, 2024
787424b
fix kafka
attilakreiner Feb 22, 2024
74ee442
fix mqtt
attilakreiner Feb 22, 2024
43c7b7c
fix date time format
attilakreiner Feb 22, 2024
64ccd2c
fix const
attilakreiner Feb 22, 2024
37dc1fb
fix supplyQName
attilakreiner Feb 22, 2024
96f0866
fix Stdout*Handler
attilakreiner Feb 22, 2024
12afe60
fix context
attilakreiner Feb 22, 2024
94c2e69
fix http add fields
attilakreiner Feb 22, 2024
858be23
fix http event names
attilakreiner Feb 22, 2024
9f2d72a
fix add http authority
attilakreiner Feb 22, 2024
e807d04
fix
attilakreiner Feb 22, 2024
bd9c1a4
impl first k3po test in stdexp
attilakreiner Feb 26, 2024
434cbee
fix stdexp formats
attilakreiner Feb 26, 2024
a0b1ddb
fix engine test
attilakreiner Feb 26, 2024
bcd2944
fix kafka
attilakreiner Feb 26, 2024
9c54422
fix tls
attilakreiner Feb 26, 2024
115405d
fix bindingId in *EventContext
attilakreiner Feb 26, 2024
e1d6a98
fix idl
attilakreiner Feb 26, 2024
4f3ecc0
WIP jwt authFail
attilakreiner Feb 26, 2024
e01ae34
WIP jwt traceId, bindingId
attilakreiner Feb 26, 2024
15937c6
WIP rm mqtt
attilakreiner Feb 26, 2024
259e5fb
WIP fix Http11EventsIT
attilakreiner Feb 26, 2024
3ca772b
WIP fix rm http authFail
attilakreiner Feb 26, 2024
bb83580
fix kafka authFail
attilakreiner Feb 26, 2024
864a587
add http2 test
attilakreiner Feb 26, 2024
39ea9a7
add kafka test
attilakreiner Feb 26, 2024
18d823f
add tls test
attilakreiner Feb 26, 2024
814f74e
add tcp test
attilakreiner Feb 27, 2024
3922e15
add jwt test
attilakreiner Feb 27, 2024
15711a1
add sch-reg test
attilakreiner Feb 27, 2024
1cc1e1b
fix
attilakreiner Feb 27, 2024
9c4f1f3
fix tcp 1
attilakreiner Feb 27, 2024
7f9d7a6
fix tcp 2
attilakreiner Feb 27, 2024
e30ec02
fix tcp 3
attilakreiner Feb 27, 2024
0a0d97d
rm sch-reg test
attilakreiner Feb 28, 2024
9748627
ref jwt test
attilakreiner Feb 28, 2024
b149557
fix
attilakreiner Feb 28, 2024
3076f6f
revert model-json chg
attilakreiner Feb 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cloud/docker-image/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>exporter-stdout</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>metrics-stream</artifactId>
Expand Down
3 changes: 2 additions & 1 deletion cloud/docker-image/src/main/docker/zpm.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@
"io.aklivity.zilla:command-stop",
"io.aklivity.zilla:command-tune",
"io.aklivity.zilla:engine",
"io.aklivity.zilla:exporter-prometheus",
"io.aklivity.zilla:exporter-otlp",
"io.aklivity.zilla:exporter-prometheus",
"io.aklivity.zilla:exporter-stdout",
"io.aklivity.zilla:guard-jwt",
"io.aklivity.zilla:metrics-stream",
"io.aklivity.zilla:metrics-http",
Expand Down
20 changes: 20 additions & 0 deletions incubator/catalog-schema-registry.spec/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,23 @@
<groupId>org.jasig.maven</groupId>
<artifactId>maven-notice-plugin</artifactId>
</plugin>
<plugin>
<groupId>${project.groupId}</groupId>
<artifactId>flyweight-maven-plugin</artifactId>
<version>${project.version}</version>
<configuration>
<scopeNames>core schema_registry</scopeNames>
<packageName>io.aklivity.zilla.specs.catalog.schema.registry.internal.types</packageName>
</configuration>
<executions>
<execution>
<goals>
<goal>validate</goal>
<goal>generate</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.mycila</groupId>
<artifactId>license-maven-plugin</artifactId>
Expand All @@ -86,6 +103,9 @@
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>io/aklivity/zilla/specs/catalog/schema/registry/internal/types/**/*.class</exclude>
</excludes>
<rules>
<rule>
<element>BUNDLE</element>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
scope schema_registry
{
scope event
{
enum SchemaRegistryEventType (uint8)
{
REMOTE_ACCESS_REJECTED (1)
}

struct SchemaRegistryRemoteAccessRejected extends core::event::Event
{
string8 method;
string16 url;
int16 status;
}

union SchemaRegistryEvent switch (SchemaRegistryEventType)
{
case REMOTE_ACCESS_REJECTED: SchemaRegistryRemoteAccessRejected remoteAccessRejected;
}
}
}
12 changes: 6 additions & 6 deletions incubator/catalog-schema-registry/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<jacoco.coverage.ratio>0.90</jacoco.coverage.ratio>
<jacoco.coverage.ratio>0.80</jacoco.coverage.ratio>
<jacoco.missed.count>0</jacoco.missed.count>
</properties>

Expand Down Expand Up @@ -71,16 +71,12 @@
<groupId>org.jasig.maven</groupId>
<artifactId>maven-notice-plugin</artifactId>
</plugin>
<plugin>
<groupId>com.mycila</groupId>
<artifactId>license-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>${project.groupId}</groupId>
<artifactId>flyweight-maven-plugin</artifactId>
<version>${project.version}</version>
<configuration>
<scopeNames>internal</scopeNames>
<scopeNames>core schema_registry internal</scopeNames>
<packageName>io.aklivity.zilla.runtime.catalog.schema.registry.internal.types</packageName>
</configuration>
<executions>
Expand All @@ -91,6 +87,10 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.mycila</groupId>
<artifactId>license-maven-plugin</artifactId>
</plugin>
<plugin>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public String name()
public CatalogContext supply(
EngineContext context)
{
return new SchemaRegistryCatalogContext();
return new SchemaRegistryCatalogContext(context);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,25 @@
package io.aklivity.zilla.runtime.catalog.schema.registry.internal;

import io.aklivity.zilla.runtime.catalog.schema.registry.internal.config.SchemaRegistryOptionsConfig;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.catalog.CatalogContext;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.config.CatalogConfig;

public class SchemaRegistryCatalogContext implements CatalogContext
{
private final EngineContext context;

public SchemaRegistryCatalogContext(
EngineContext context)
{
this.context = context;
}

@Override
public CatalogHandler attach(
CatalogConfig catalog)
{
return new SchemaRegistryCatalogHandler(SchemaRegistryOptionsConfig.class.cast(catalog.options));
return new SchemaRegistryCatalogHandler(SchemaRegistryOptionsConfig.class.cast(catalog.options), context, catalog.id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.config.SchemaRegistryOptionsConfig;
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.serializer.RegisterSchemaRequest;
import io.aklivity.zilla.runtime.catalog.schema.registry.internal.types.SchemaRegistryPrefixFW;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;

Expand All @@ -51,9 +52,13 @@ public class SchemaRegistryCatalogHandler implements CatalogHandler
private final Int2ObjectCache<String> schemas;
private final Int2ObjectCache<CachedSchemaId> schemaIds;
private final long maxAgeMillis;
private final SchemaRegistryEventContext event;
private final long catalogId;

public SchemaRegistryCatalogHandler(
SchemaRegistryOptionsConfig config)
SchemaRegistryOptionsConfig config,
EngineContext context,
long catalogId)
{
this.baseUrl = config.url;
this.client = HttpClient.newHttpClient();
Expand All @@ -62,6 +67,8 @@ public SchemaRegistryCatalogHandler(
this.schemas = new Int2ObjectCache<>(1, 1024, i -> {});
this.schemaIds = new Int2ObjectCache<>(1, 1024, i -> {});
this.maxAgeMillis = config.maxAge.toMillis();
this.event = new SchemaRegistryEventContext(context);
this.catalogId = catalogId;
}

@Override
Expand Down Expand Up @@ -209,12 +216,18 @@ private String sendHttpRequest(

try
{
HttpResponse<String> response = client.send(httpRequest, HttpResponse.BodyHandlers.ofString());
return response.statusCode() == 200 ? response.body() : null;
HttpResponse<String> httpResponse = client.send(httpRequest, HttpResponse.BodyHandlers.ofString());
boolean success = httpResponse.statusCode() == 200;
String responseBody = success ? httpResponse.body() : null;
if (!success)
{
event.remoteAccessRejected(catalogId, httpRequest, httpResponse.statusCode());
}
return responseBody;
}
catch (Exception ex)
{
ex.printStackTrace(System.out);
event.remoteAccessRejected(catalogId, httpRequest, 0);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2021-2023 Aklivity Inc
*
* Licensed under the Aklivity Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://www.aklivity.io/aklivity-community-license/
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package io.aklivity.zilla.runtime.catalog.schema.registry.internal;

import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
import java.time.Clock;

import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.catalog.schema.registry.internal.types.event.SchemaRegistryEventFW;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.binding.function.MessageConsumer;

public class SchemaRegistryEventContext
{
private static final int EVENT_BUFFER_CAPACITY = 1024;

private final SchemaRegistryEventFW.Builder schemaRegistryEventRW = new SchemaRegistryEventFW.Builder();
private final MutableDirectBuffer eventBuffer = new UnsafeBuffer(ByteBuffer.allocate(EVENT_BUFFER_CAPACITY));
private final int schemaRegistryTypeId;
private final MessageConsumer eventWriter;
private final Clock clock;

public SchemaRegistryEventContext(
EngineContext context)
{
this.schemaRegistryTypeId = context.supplyTypeId(SchemaRegistryCatalog.NAME);
this.eventWriter = context.supplyEventWriter();
this.clock = context.clock();
}

public void remoteAccessRejected(
long catalogId,
HttpRequest httpRequest,
int status)
{
SchemaRegistryEventFW event = schemaRegistryEventRW
.wrap(eventBuffer, 0, eventBuffer.capacity())
.remoteAccessRejected(e -> e
.timestamp(clock.millis())
.traceId(0L)
.namespacedId(catalogId)
.method(httpRequest.method())
.url(httpRequest.uri().toString())
.status((short) status)
)
.build();
eventWriter.accept(schemaRegistryTypeId, event.buffer(), event.offset(), event.limit());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.rules.RuleChain.outerRule;
import static org.mockito.Mockito.mock;

import java.time.Duration;

Expand All @@ -35,6 +36,7 @@
import org.kaazing.k3po.junit.rules.K3poRule;

import io.aklivity.zilla.runtime.catalog.schema.registry.internal.config.SchemaRegistryOptionsConfig;
import io.aklivity.zilla.runtime.engine.EngineContext;
import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;

Expand All @@ -49,6 +51,7 @@ public class SchemaRegistryIT
public final TestRule chain = outerRule(k3po).around(timeout);

private SchemaRegistryOptionsConfig config;
private EngineContext context = mock(EngineContext.class);

@Before
public void setup()
Expand All @@ -69,7 +72,7 @@ public void shouldResolveSchemaViaSchemaId() throws Exception
"{\"name\":\"status\",\"type\":\"string\"}]," +
"\"name\":\"Event\",\"namespace\":\"io.aklivity.example\",\"type\":\"record\"}";

SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config);
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config, context, 0L);

String schema = catalog.resolve(9);

Expand All @@ -88,7 +91,7 @@ public void shouldResolveSchemaViaSubjectVersion() throws Exception
"{\"name\":\"status\",\"type\":\"string\"}]," +
"\"name\":\"Event\",\"namespace\":\"io.aklivity.example\",\"type\":\"record\"}";

SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config);
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config, context, 0L);

int schemaId = catalog.resolve("items-snapshots-value", "latest");

Expand All @@ -109,7 +112,7 @@ public void shouldRegisterSchema() throws Exception
String schema = "{\"type\": \"record\",\"name\": \"test\",\"fields\":[{\"type\": \"string\",\"name\": \"field1\"}," +
"{\"type\": \"com.acme.Referenced\",\"name\": \"int\"}]}";

SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config);
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config, context, 0L);

int schemaId = catalog.register("items-snapshots-value", "avro", schema);

Expand All @@ -128,7 +131,7 @@ public void shouldResolveSchemaViaSchemaIdFromCache() throws Exception
"{\"name\":\"status\",\"type\":\"string\"}]," +
"\"name\":\"Event\",\"namespace\":\"io.aklivity.example\",\"type\":\"record\"}";

SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config);
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config, context, 0L);

catalog.resolve(9);

Expand All @@ -149,7 +152,7 @@ public void shouldResolveSchemaViaSubjectVersionFromCache() throws Exception
"{\"name\":\"status\",\"type\":\"string\"}]," +
"\"name\":\"Event\",\"namespace\":\"io.aklivity.example\",\"type\":\"record\"}";

SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config);
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config, context, 0L);

catalog.resolve(catalog.resolve("items-snapshots-value", "latest"));

Expand All @@ -167,15 +170,15 @@ public void shouldResolveSchemaViaSubjectVersionFromCache() throws Exception
@Test
public void shouldVerifyMaxPadding()
{
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config);
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config, context, 0L);

assertEquals(5, catalog.encodePadding());
}

@Test
public void shouldVerifyEncodedData()
{
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config);
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config, context, 0L);

DirectBuffer data = new UnsafeBuffer();

Expand All @@ -191,7 +194,7 @@ public void shouldVerifyEncodedData()
public void shouldResolveSchemaIdAndProcessData()
{

SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config);
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config, context, 0L);

DirectBuffer data = new UnsafeBuffer();

Expand All @@ -207,7 +210,7 @@ public void shouldResolveSchemaIdAndProcessData()
@Test
public void shouldResolveSchemaIdFromData()
{
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config);
SchemaRegistryCatalogHandler catalog = new SchemaRegistryCatalogHandler(config, context, 0L);

DirectBuffer data = new UnsafeBuffer();

Expand Down
Loading