Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect missing events in test exporter #1128

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import static java.lang.System.currentTimeMillis;
import static java.lang.ThreadLocal.withInitial;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.agrona.CloseHelper.quietClose;
import static org.agrona.LangUtil.rethrowUnchecked;
import static org.agrona.concurrent.AgentRunner.startOnThread;

import java.net.InetAddress;
Expand Down Expand Up @@ -64,7 +66,6 @@
import java.util.function.LongUnaryOperator;
import java.util.function.Supplier;

import org.agrona.CloseHelper;
import org.agrona.DeadlineTimerWheel;
import org.agrona.DeadlineTimerWheel.TimerHandler;
import org.agrona.DirectBuffer;
Expand Down Expand Up @@ -789,8 +790,15 @@ public void doStart()

public void doClose()
{
CloseHelper.close(runner);
thread = null;
try
{
Consumer<Thread> timeout = t -> rethrowUnchecked(new IllegalStateException("close timeout"));
runner.close((int) SECONDS.toMillis(5L), timeout);
}
finally
{
thread = null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ private static WatchService newWatchService(
{
try
{
watcher = fileSystem.newWatchService();
watcher = newWatchService(fileSystem);
}
catch (UnsupportedOperationException ex)
{
Expand All @@ -302,4 +302,10 @@ private static WatchService newWatchService(

return watcher;
}

private static WatchService newWatchService(
FileSystem fileSystem) throws IOException
{
return fileSystem.newWatchService();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -367,14 +367,14 @@ public void evaluate() throws Throwable
}
finally
{
if (!allowErrors)
{
assertEmpty(errors);
}
if (fs != null)
{
fs.close();
}
if (!allowErrors)
{
assertEmpty(errors);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import io.aklivity.zilla.runtime.engine.config.RouteConfig;
import io.aklivity.zilla.runtime.engine.config.SchemaConfig;
import io.aklivity.zilla.runtime.engine.guard.GuardHandler;
import io.aklivity.zilla.runtime.engine.model.ConverterHandler;
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;
import io.aklivity.zilla.runtime.engine.namespace.NamespacedId;
import io.aklivity.zilla.runtime.engine.test.internal.binding.config.TestBindingOptionsConfig;
import io.aklivity.zilla.runtime.engine.test.internal.binding.config.TestBindingOptionsConfig.CatalogAssertion;
Expand Down Expand Up @@ -79,6 +81,7 @@ final class TestBindingFactory implements BindingHandler
private final Long2LongHashMap router;
private final TestEventContext event;

private ConverterHandler valueType;
private List<CatalogHandler> catalogs;
private SchemaConfig catalog;
private List<CatalogAssertion> catalogAssertions;
Expand Down Expand Up @@ -111,6 +114,11 @@ public void attach(
TestBindingOptionsConfig options = (TestBindingOptionsConfig) binding.options;
if (options != null)
{
if (options.value != null)
{
this.valueType = context.supplyWriteConverter(options.value);
}

if (options.cataloged != null)
{
this.catalog = options.cataloged.size() != 0 ? options.cataloged.get(0).schemas.get(0) : null;
Expand All @@ -119,18 +127,21 @@ public void attach(
{
int namespaceId = context.supplyTypeId(binding.namespace);
int catalogId = context.supplyTypeId(catalog.name);
catalogs.add(context.supplyCatalog(NamespacedId.id(namespaceId, catalogId)));
final CatalogHandler handler = context.supplyCatalog(NamespacedId.id(namespaceId, catalogId));
catalogs.add(handler);
}
this.catalogAssertions = options.catalogAssertions != null && !options.catalogAssertions.isEmpty() ?
options.catalogAssertions.get(0).assertions : null;
}

if (options.authorization != null)
{
int namespaceId = context.supplyTypeId(binding.namespace);
int guardId = context.supplyTypeId(options.authorization.name);
this.guard = context.supplyGuard(NamespacedId.id(namespaceId, guardId));
this.credentials = options.authorization.credentials;
}

this.events = options.events;
}
}
Expand Down Expand Up @@ -336,7 +347,16 @@ private void onInitialData(

initialSeq = sequence + reserved;

target.doInitialData(traceId, flags, reserved, payload);
if (valueType != null &&
valueType.convert(traceId, routedId, payload.buffer(), payload.offset(), payload.sizeof(),
ValueConsumer.NOP) < 0)
{
target.doInitialAbort(traceId);
}
else
{
target.doInitialData(traceId, flags, reserved, payload);
}
}

private void onInitialEnd(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import java.util.function.Function;

import io.aklivity.zilla.runtime.engine.config.CatalogedConfig;
import io.aklivity.zilla.runtime.engine.config.ModelConfig;
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;

public final class TestBindingOptionsConfig extends OptionsConfig
{
public final ModelConfig value;
public final String mode;
public final TestAuthorizationConfig authorization;
public final List<CatalogedConfig> cataloged;
Expand All @@ -41,12 +43,15 @@ public static <T> TestBindingOptionsConfigBuilder<T> builder(
}

TestBindingOptionsConfig(
ModelConfig value,
String mode,
TestAuthorizationConfig authorization,
List<CatalogedConfig> cataloged,
List<Event> events,
List<CatalogAssertions> catalogAssertions)
{
super(value != null ? List.of(value) : List.of(), List.of());
this.value = value;
this.mode = mode;
this.authorization = authorization;
this.cataloged = cataloged;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import jakarta.json.JsonValue;

import io.aklivity.zilla.runtime.engine.config.CatalogedConfig;
import io.aklivity.zilla.runtime.engine.config.ModelConfigAdapter;
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;
import io.aklivity.zilla.runtime.engine.config.OptionsConfigAdapterSpi;
import io.aklivity.zilla.runtime.engine.config.SchemaConfig;
Expand All @@ -35,6 +36,7 @@ public final class TestBindingOptionsConfigAdapter implements OptionsConfigAdapt
{
public static final String DEFAULT_ASSERTION_SCHEMA = new String();

private static final String VALUE_NAME = "value";
private static final String MODE_NAME = "mode";
private static final String CATALOG_NAME = "catalog";
private static final String AUTHORIZATION_NAME = "authorization";
Expand All @@ -47,6 +49,8 @@ public final class TestBindingOptionsConfigAdapter implements OptionsConfigAdapt
private static final String SCHEMA_NAME = "schema";
private static final String DELAY_NAME = "delay";

private final ModelConfigAdapter model = new ModelConfigAdapter();

private final SchemaConfigAdapter schema = new SchemaConfigAdapter();

@Override
Expand All @@ -69,10 +73,16 @@ public JsonObject adaptToJson(

JsonObjectBuilder object = Json.createObjectBuilder();

if (testOptions.value != null)
{
object.add(VALUE_NAME, model.adaptToJson(testOptions.value));
}

if (testOptions.mode != null)
{
object.add(MODE_NAME, testOptions.mode);
}

if (testOptions.cataloged != null && !testOptions.cataloged.isEmpty())
{
JsonObjectBuilder catalogs = Json.createObjectBuilder();
Expand All @@ -87,6 +97,7 @@ public JsonObject adaptToJson(
}
object.add(CATALOG_NAME, catalogs);
}

if (testOptions.catalogAssertions != null)
{
JsonObjectBuilder assertions = Json.createObjectBuilder();
Expand All @@ -107,6 +118,7 @@ public JsonObject adaptToJson(
assertions.add(CATALOG_NAME, catalogAssertions);
object.add(ASSERTIONS_NAME, assertions);
}

if (testOptions.authorization != null)
{
JsonObjectBuilder credentials = Json.createObjectBuilder();
Expand All @@ -115,6 +127,7 @@ public JsonObject adaptToJson(
authorization.add(testOptions.authorization.name, credentials);
object.add(AUTHORIZATION_NAME, authorization);
}

if (testOptions.events != null)
{
JsonArrayBuilder events = Json.createArrayBuilder();
Expand All @@ -139,10 +152,16 @@ public OptionsConfig adaptFromJson(

if (object != null)
{
if (object.containsKey(VALUE_NAME))
{
testOptions.value(model.adaptFromJson(object.get(VALUE_NAME)));
}

if (object.containsKey(MODE_NAME))
{
testOptions.mode(object.getString(MODE_NAME));
}

if (object.containsKey(CATALOG_NAME))
{
JsonObject catalogsJson = object.getJsonObject(CATALOG_NAME);
Expand All @@ -161,6 +180,7 @@ public OptionsConfig adaptFromJson(
}
testOptions.catalog(catalogs);
}

if (object.containsKey(ASSERTIONS_NAME))
{
JsonObject assertionsJson = object.getJsonObject(ASSERTIONS_NAME);
Expand All @@ -184,6 +204,7 @@ public OptionsConfig adaptFromJson(
}
}
}

if (object.containsKey(AUTHORIZATION_NAME))
{
JsonObject authorization = object.getJsonObject(AUTHORIZATION_NAME);
Expand All @@ -198,6 +219,7 @@ public OptionsConfig adaptFromJson(
}
}
}

if (object.containsKey(EVENTS_NAME))
{
JsonArray events = object.getJsonArray(EVENTS_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@

import io.aklivity.zilla.runtime.engine.config.CatalogedConfig;
import io.aklivity.zilla.runtime.engine.config.ConfigBuilder;
import io.aklivity.zilla.runtime.engine.config.ModelConfig;
import io.aklivity.zilla.runtime.engine.config.OptionsConfig;

public final class TestBindingOptionsConfigBuilder<T> extends ConfigBuilder<T, TestBindingOptionsConfigBuilder<T>>
{
private final Function<OptionsConfig, T> mapper;

private ModelConfig value;
private String mode;
private TestAuthorizationConfig authorization;
private List<CatalogedConfig> catalogs;
Expand All @@ -46,6 +48,13 @@ protected Class<TestBindingOptionsConfigBuilder<T>> thisType()
return (Class<TestBindingOptionsConfigBuilder<T>>) getClass();
}

public TestBindingOptionsConfigBuilder<T> value(
ModelConfig value)
{
this.value = value;
return this;
}

public TestBindingOptionsConfigBuilder<T> mode(
String mode)
{
Expand Down Expand Up @@ -95,6 +104,6 @@ public TestBindingOptionsConfigBuilder<T> catalogAssertions(
@Override
public T build()
{
return mapper.apply(new TestBindingOptionsConfig(mode, authorization, catalogs, events, catalogAssertions));
return mapper.apply(new TestBindingOptionsConfig(value, mode, authorization, catalogs, events, catalogAssertions));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ public int export()
@Override
public void stop()
{
try
{
// drain events
while (options.events != null &&
eventIndex < options.events.size())
{
readEvent.read(this::handleEvent, Integer.MAX_VALUE);
}
}
catch (Exception ex)
{
assert options.events == null || eventIndex == options.events.size();
}
}

private void handleEvent(
Expand Down
4 changes: 2 additions & 2 deletions runtime/engine/src/test/resources/FileSystemHelper.btm
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
RULE watcher service failed
CLASS ^java.nio.file.FileSystem
METHOD newWatchService
CLASS io.aklivity.zilla.runtime.engine.internal.watcher.EngineConfigWatcher
METHOD newWatchService(java.nio.file.FileSystem)
IF TRUE
DO throw new java.io.IOException("[failed]")
ENDRULE
Loading