Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Model specific cache detect schema change update #767

Merged
merged 3 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.function.LongFunction;
import java.util.zip.CRC32C;

import org.agrona.DirectBuffer;
import org.agrona.ExpandableDirectByteBuffer;
Expand All @@ -43,7 +44,7 @@
import io.aklivity.zilla.runtime.engine.config.SchemaConfig;
import io.aklivity.zilla.runtime.model.avro.config.AvroModelConfig;

public abstract class AvroConverterHandler
public abstract class AvroModelHandler
{
protected static final String VIEW_JSON = "json";

Expand All @@ -67,8 +68,10 @@ public abstract class AvroConverterHandler
private final Int2ObjectCache<GenericDatumWriter<GenericRecord>> writers;
private final Int2ObjectCache<GenericRecord> records;
private final Int2IntHashMap paddings;
private final Int2IntHashMap crcCache;
private final CRC32C crc32c;

protected AvroConverterHandler(
protected AvroModelHandler(
AvroModelConfig config,
LongFunction<CatalogHandler> supplyCatalog)
{
Expand All @@ -90,6 +93,8 @@ protected AvroConverterHandler(
this.paddings = new Int2IntHashMap(-1);
this.expandable = new ExpandableDirectBufferOutputStream(new ExpandableDirectByteBuffer());
this.in = new DirectBufferInputStream();
this.crc32c = new CRC32C();
this.crcCache = new Int2IntHashMap(0);
}

protected final boolean validate(
Expand All @@ -101,6 +106,7 @@ protected final boolean validate(
boolean status = false;
try
{
invalidateCacheOnSchemaUpdate(schemaId);
GenericRecord record = supplyRecord(schemaId);
in.wrap(buffer, index, length);
GenericDatumReader<GenericRecord> reader = supplyReader(schemaId);
Expand Down Expand Up @@ -147,6 +153,26 @@ protected final GenericRecord supplyRecord(
return records.computeIfAbsent(schemaId, this::createRecord);
}

protected void invalidateCacheOnSchemaUpdate(
int schemaId)
{
if (crcCache.containsKey(schemaId))
{
String schemaText = handler.resolve(schemaId);
int checkSum = generateCRC32C(schemaText);
if (schemaText != null && crcCache.get(schemaId) != checkSum)
{
crcCache.remove(schemaId);
schemas.remove(schemaId);
readers.remove(schemaId);
writers.remove(schemaId);
records.remove(schemaId);
paddings.remove(schemaId);

}
}
}

private GenericDatumReader<GenericRecord> createReader(
int schemaId)
{
Expand Down Expand Up @@ -191,6 +217,7 @@ private Schema resolveSchema(
if (schemaText != null)
{
schema = new Schema.Parser().parse(schemaText);
crcCache.put(schemaId, generateCRC32C(schemaText));
}
return schema;
}
Expand All @@ -217,4 +244,13 @@ private int calculatePadding(
}
return padding;
}

private int generateCRC32C(
String schemaText)
{
byte[] bytes = schemaText.getBytes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is going to allocate a new byte[] on every call to validate.
Let's instead store the actual schemaText rather than a crc32c hash to compare.

crc32c.reset();
crc32c.update(bytes, 0, bytes.length);
return (int) crc32c.getValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;
import io.aklivity.zilla.runtime.model.avro.config.AvroModelConfig;

public class AvroReadConverterHandler extends AvroConverterHandler implements ConverterHandler
public class AvroReadConverterHandler extends AvroModelHandler implements ConverterHandler
{
public AvroReadConverterHandler(
AvroModelConfig config,
Expand Down Expand Up @@ -125,6 +125,7 @@ private void deserializeRecord(
{
try
{
invalidateCacheOnSchemaUpdate(schemaId);
GenericDatumReader<GenericRecord> reader = supplyReader(schemaId);
GenericDatumWriter<GenericRecord> writer = supplyWriter(schemaId);
if (reader != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;
import io.aklivity.zilla.runtime.model.avro.config.AvroModelConfig;

public class AvroWriteConverterHandler extends AvroConverterHandler implements ConverterHandler
public class AvroWriteConverterHandler extends AvroModelHandler implements ConverterHandler
{
public AvroWriteConverterHandler(
AvroModelConfig config,
Expand Down Expand Up @@ -80,6 +80,7 @@ private int serializeJsonRecord(
{
try
{
invalidateCacheOnSchemaUpdate(schemaId);
Schema schema = supplySchema(schemaId);
GenericDatumReader<GenericRecord> reader = supplyReader(schemaId);
GenericDatumWriter<GenericRecord> writer = supplyWriter(schemaId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
public class AvroModelFactorySpiTest
{
@Test
public void shouldCreateReader()
public void shouldLoadAndCreate()
{
Configuration config = new Configuration();
ModelFactory factory = ModelFactory.instantiate();
Expand All @@ -51,7 +51,7 @@ public void shouldCreateReader()
.build();

assertThat(model, instanceOf(AvroModel.class));
assertThat(context.supplyReadConverterHandler(modelConfig), instanceOf(AvroConverterHandler.class));
assertThat(context.supplyWriteConverterHandler(modelConfig), instanceOf(AvroConverterHandler.class));
assertThat(context.supplyReadConverterHandler(modelConfig), instanceOf(AvroReadConverterHandler.class));
assertThat(context.supplyWriteConverterHandler(modelConfig), instanceOf(AvroWriteConverterHandler.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

import java.io.StringReader;
import java.util.function.LongFunction;
import java.util.zip.CRC32C;

import jakarta.json.spi.JsonProvider;
import jakarta.json.stream.JsonParser;
import jakarta.json.stream.JsonParserFactory;

import org.agrona.DirectBuffer;
import org.agrona.collections.Int2IntHashMap;
import org.agrona.collections.Int2ObjectCache;
import org.agrona.io.DirectBufferInputStream;
import org.leadpony.justify.api.JsonSchema;
Expand All @@ -35,7 +37,7 @@
import io.aklivity.zilla.runtime.engine.config.SchemaConfig;
import io.aklivity.zilla.runtime.model.json.config.JsonModelConfig;

public abstract class JsonConverterHandler
public abstract class JsonModelHandler
{
protected final SchemaConfig catalog;
protected final CatalogHandler handler;
Expand All @@ -46,9 +48,11 @@ public abstract class JsonConverterHandler
private final JsonProvider schemaProvider;
private final JsonValidationService service;
private final JsonParserFactory factory;
private final CRC32C crc32c;
private final Int2IntHashMap crcCache;
private DirectBufferInputStream in;

public JsonConverterHandler(
public JsonModelHandler(
JsonModelConfig config,
LongFunction<CatalogHandler> supplyCatalog)
{
Expand All @@ -64,6 +68,8 @@ public JsonConverterHandler(
this.schemas = new Int2ObjectCache<>(1, 1024, i -> {});
this.providers = new Int2ObjectCache<>(1, 1024, i -> {});
this.in = new DirectBufferInputStream();
this.crc32c = new CRC32C();
this.crcCache = new Int2IntHashMap(0);
}

protected final boolean validate(
Expand All @@ -75,6 +81,7 @@ protected final boolean validate(
boolean status = false;
try
{
invalidateCacheOnSchemaUpdate(schemaId);
JsonProvider provider = supplyProvider(schemaId);
in.wrap(buffer, index, length);
provider.createReader(in).readValue();
Expand All @@ -87,18 +94,34 @@ protected final boolean validate(
return status;
}

private JsonSchema supplySchema(
protected void invalidateCacheOnSchemaUpdate(
int schemaId)
{
return schemas.computeIfAbsent(schemaId, this::resolveSchema);
if (crcCache.containsKey(schemaId))
{
String schemaText = handler.resolve(schemaId);
int checkSum = generateCRC32C(schemaText);
if (schemaText != null && crcCache.get(schemaId) != checkSum)
{
crcCache.remove(schemaId);
schemas.remove(schemaId);
providers.remove(schemaId);
}
}
}

private JsonProvider supplyProvider(
protected JsonProvider supplyProvider(
int schemaId)
{
return providers.computeIfAbsent(schemaId, this::createProvider);
}

private JsonSchema supplySchema(
int schemaId)
{
return schemas.computeIfAbsent(schemaId, this::resolveSchema);
}

private JsonSchema resolveSchema(
int schemaId)
{
Expand All @@ -109,6 +132,7 @@ private JsonSchema resolveSchema(
JsonParser schemaParser = factory.createParser(new StringReader(schemaText));
JsonSchemaReader reader = service.createSchemaReader(schemaParser);
schema = reader.read();
crcCache.put(schemaId, generateCRC32C(schemaText));
}

return schema;
Expand All @@ -125,4 +149,13 @@ private JsonProvider createProvider(
}
return provider;
}

private int generateCRC32C(
String schemaText)
{
byte[] bytes = schemaText.getBytes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same feedback here as for avro.

crc32c.reset();
crc32c.update(bytes, 0, bytes.length);
return (int) crc32c.getValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;
import io.aklivity.zilla.runtime.model.json.config.JsonModelConfig;

public class JsonReadConverterHandler extends JsonConverterHandler implements ConverterHandler
public class JsonReadConverterHandler extends JsonModelHandler implements ConverterHandler
{
public JsonReadConverterHandler(
JsonModelConfig config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,23 @@
*/
package io.aklivity.zilla.runtime.model.json.internal;

import java.io.StringReader;
import java.util.function.LongFunction;

import jakarta.json.spi.JsonProvider;
import jakarta.json.stream.JsonParser;
import jakarta.json.stream.JsonParserFactory;
import jakarta.json.stream.JsonParsingException;

import org.agrona.DirectBuffer;
import org.agrona.ExpandableDirectByteBuffer;
import org.agrona.collections.Int2ObjectCache;
import org.agrona.io.DirectBufferInputStream;
import org.leadpony.justify.api.JsonSchema;
import org.leadpony.justify.api.JsonSchemaReader;
import org.leadpony.justify.api.JsonValidationService;
import org.leadpony.justify.api.ProblemHandler;

import io.aklivity.zilla.runtime.engine.catalog.CatalogHandler;
import io.aklivity.zilla.runtime.engine.config.CatalogedConfig;
import io.aklivity.zilla.runtime.engine.config.SchemaConfig;
import io.aklivity.zilla.runtime.engine.model.ValidatorHandler;
import io.aklivity.zilla.runtime.engine.model.function.ValueConsumer;
import io.aklivity.zilla.runtime.model.json.config.JsonModelConfig;

public class JsonValidatorHandler implements ValidatorHandler
public class JsonValidatorHandler extends JsonModelHandler implements ValidatorHandler
{
private final SchemaConfig catalog;
private final CatalogHandler handler;
private final String subject;
private final Int2ObjectCache<JsonSchema> schemas;
private final Int2ObjectCache<JsonProvider> providers;
private final JsonProvider schemaProvider;
private final JsonValidationService service;
private final JsonParserFactory factory;
private final DirectBufferInputStream in;
private final ExpandableDirectByteBuffer buffer;

Expand All @@ -58,17 +41,7 @@ public JsonValidatorHandler(
JsonModelConfig config,
LongFunction<CatalogHandler> supplyCatalog)
{
this.schemaProvider = JsonProvider.provider();
this.service = JsonValidationService.newInstance();
this.factory = schemaProvider.createParserFactory(null);
CatalogedConfig cataloged = config.cataloged.get(0);
this.catalog = cataloged.schemas.size() != 0 ? cataloged.schemas.get(0) : null;
this.handler = supplyCatalog.apply(cataloged.id);
this.subject = catalog != null && catalog.subject != null
? catalog.subject
: config.subject;
this.schemas = new Int2ObjectCache<>(1, 1024, i -> {});
this.providers = new Int2ObjectCache<>(1, 1024, i -> {});
super(config, supplyCatalog);
this.buffer = new ExpandableDirectByteBuffer();
this.in = new DirectBufferInputStream(buffer);
}
Expand All @@ -83,10 +56,6 @@ public boolean validate(
{
boolean status = true;

int schemaId = catalog != null && catalog.id > 0
? catalog.id
: handler.resolve(subject, catalog.version);

try
{
if ((flags & FLAGS_INIT) != 0x00)
Expand All @@ -100,6 +69,12 @@ public boolean validate(
if ((flags & FLAGS_FIN) != 0x00)
{
in.wrap(buffer, 0, progress);

int schemaId = catalog != null && catalog.id > 0
? catalog.id
: handler.resolve(subject, catalog.version);
invalidateCacheOnSchemaUpdate(schemaId);

JsonProvider provider = supplyProvider(schemaId);
parser = provider.createParser(in);
while (parser.hasNext())
Expand All @@ -116,43 +91,4 @@ public boolean validate(

return status;
}

private JsonSchema supplySchema(
int schemaId)
{
return schemas.computeIfAbsent(schemaId, this::resolveSchema);
}

private JsonProvider supplyProvider(
int schemaId)
{
return providers.computeIfAbsent(schemaId, this::createProvider);
}

private JsonSchema resolveSchema(
int schemaId)
{
JsonSchema schema = null;
String schemaText = handler.resolve(schemaId);
if (schemaText != null)
{
JsonParser schemaParser = factory.createParser(new StringReader(schemaText));
JsonSchemaReader reader = service.createSchemaReader(schemaParser);
schema = reader.read();
}

return schema;
}

private JsonProvider createProvider(
int schemaId)
{
JsonSchema schema = supplySchema(schemaId);
JsonProvider provider = null;
if (schema != null)
{
provider = service.createJsonProvider(schema, parser -> ProblemHandler.throwing());
}
return provider;
}
}
Loading
Loading