Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Avro batch format (#1430)
Browse files Browse the repository at this point in the history
* the commit introduces Avro batch format for Nakadi Publishing API
  • Loading branch information
adyach authored Jun 16, 2022
1 parent 856a353 commit f8816a5
Show file tree
Hide file tree
Showing 69 changed files with 1,032 additions and 550 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.zalando.nakadi.repository.kafka;

import com.codahale.metrics.MetricRegistry;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
Expand Down Expand Up @@ -78,7 +77,6 @@ public class KafkaRepositoryAT extends BaseAT {

private NakadiSettings nakadiSettings;
private KafkaSettings kafkaSettings;
private ZookeeperSettings zookeeperSettings;
private KafkaTestHelper kafkaHelper;
private KafkaTopicRepository kafkaTopicRepository;
private NakadiTopicConfig defaultTopicConfig;
Expand Down Expand Up @@ -110,7 +108,6 @@ public void setup() {
kafkaSettings = new KafkaSettings(KAFKA_RETRIES, KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_BUFFER_MEMORY,
KAFKA_LINGER_MS, KAFKA_ENABLE_AUTO_COMMIT, KAFKA_MAX_REQUEST_SIZE,
KAFKA_DELIVERY_TIMEOUT, KAFKA_MAX_BLOCK_TIMEOUT, "", KAFKA_COMPRESSION_TYPE);
zookeeperSettings = new ZookeeperSettings(ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, ZK_MAX_IN_FLIGHT_REQUESTS);
kafkaHelper = new KafkaTestHelper(KAFKA_URL);
defaultTopicConfig = new NakadiTopicConfig(DEFAULT_PARTITION_COUNT, DEFAULT_CLEANUP_POLICY,
Optional.of(DEFAULT_RETENTION_TIME));
Expand Down Expand Up @@ -290,10 +287,8 @@ private KafkaTopicRepository createKafkaTopicRepository() {
.setKafkaFactory(factory)
.setNakadiSettings(nakadiSettings)
.setKafkaSettings(kafkaSettings)
.setZookeeperSettings(zookeeperSettings)
.setKafkaTopicConfigFactory(kafkaTopicConfigFactory)
.setKafkaLocationManager(kafkaLocationManager)
.setMetricRegistry(new MetricRegistry())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,42 +1,94 @@
package org.zalando.nakadi.webservice;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.junit.Ignore;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.EncoderFactory;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.core.io.DefaultResourceLoader;
import org.zalando.nakadi.domain.EnrichmentStrategyDescriptor;
import org.zalando.nakadi.domain.EventCategory;
import org.zalando.nakadi.domain.EventTypeSchema;
import org.zalando.nakadi.domain.EventTypeSchemaBase;
import org.zalando.nakadi.domain.Subscription;
import org.zalando.nakadi.generated.avro.Envelope;
import org.zalando.nakadi.generated.avro.Metadata;
import org.zalando.nakadi.generated.avro.PublishingBatch;
import org.zalando.nakadi.utils.EventTypeTestBuilder;
import org.zalando.nakadi.utils.RandomSubscriptionBuilder;
import org.zalando.nakadi.utils.TestUtils;
import org.zalando.nakadi.webservice.utils.NakadiTestUtils;
import org.zalando.nakadi.webservice.utils.TestStreamingClient;

import java.util.Base64;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.List;
import java.util.Map;

import static com.jayway.restassured.RestAssured.given;
import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.BEGIN;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription;

public class BinaryEndToEndAT extends BaseAT {
private static final String TEST_ET_NAME = "nakadi.test-2022-05-06.et";
private static final String TEST_DATA = "BAAAAGO0v/qJk2BIZjU5ZmVlNTQtMmNhYy00MTAzLWI4NTItOGMwOGRiZjhlNjEyAhJ0ZX" +
"N0LWZsb3cAAjECEnRlc3QtdXNlcjJuYWthZGkudGVzdC0yMDIyLTA1LTA2LmV0AAAAAAAAAABBCFBPU1QYL2V2ZW50LXR5cGVzAB50ZX" +
"N0LXVzZXItYWdlbnQMbmFrYWRpFGhhc2hlZC1hcHCSAxQCLQQtLfYBggU=";

@Test
@Ignore
public void testAvroPublishing() throws JsonProcessingException {
public void testAvroPublishingAndJsonConsumption() throws IOException {
final Schema schema = new Schema.Parser().parse(new DefaultResourceLoader()
.getResource("nakadi.end2end.avsc").getInputStream());
final var et = EventTypeTestBuilder.builder()
.name(TEST_ET_NAME)
.category(EventCategory.BUSINESS)
.enrichmentStrategies(List.of(EnrichmentStrategyDescriptor.METADATA_ENRICHMENT))
.schema(new EventTypeSchema(new EventTypeSchemaBase(
EventTypeSchemaBase.Type.AVRO_SCHEMA,
schema.toString()), "1.0.0", TestUtils.randomDate()))
.build();
NakadiTestUtils.createEventTypeInNakadi(et);

final byte[] body = Base64.getDecoder().decode(TEST_DATA);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
new GenericDatumWriter(schema).write(
new GenericRecordBuilder(schema).set("foo", "bar").build(),
EncoderFactory.get().directBinaryEncoder(baos, null));

final PublishingBatch batch = PublishingBatch.newBuilder()
.setEvents(List.of(Envelope.newBuilder()
.setMetadata(Metadata.newBuilder()
.setEventType(TEST_ET_NAME)
.setVersion("1.0.0")
.setOccurredAt(Instant.now())
.setEid("CE8C9EBC-3F19-4B9D-A453-08AD2EDA6028")
.build())
.setPayload(ByteBuffer.wrap(baos.toByteArray()))
.build()))
.build();

final ByteBuffer body = PublishingBatch.getEncoder().encode(batch);
final var response = given()
.contentType("application/avro-binary; charset=utf-8")
.body(body)
.contentType("application/avro-binary")
.body(body.array())
.post(String.format("/event-types/%s/events", TEST_ET_NAME));
response.print();
response.then().statusCode(200);
// TODO add the consumption side once schema creation is done.

// check event is consumed and format is correct
final Subscription subscription = createSubscription(
RandomSubscriptionBuilder.builder()
.withEventType(TEST_ET_NAME)
.withStartFrom(BEGIN)
.buildSubscriptionBase());
final TestStreamingClient client = TestStreamingClient.create(subscription.getId()).start();

TestUtils.waitFor(() -> Assert.assertEquals(1, client.getBatches().size()));
final Map event = client.getBatches().get(0).getEvents().get(0);
Assert.assertEquals("bar", event.get("foo"));

final Map<String, Object> metadata = (Map<String, Object>) event.get("metadata");
Assert.assertEquals("CE8C9EBC-3F19-4B9D-A453-08AD2EDA6028", metadata.get("eid"));
Assert.assertEquals("1.0.0", metadata.get("version"));
Assert.assertEquals(TEST_ET_NAME, metadata.get("event_type"));
}
}
11 changes: 11 additions & 0 deletions acceptance-test/src/acceptance-test/resources/nakadi.end2end.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"name": "TestEnd2End",
"namespace": "org.zalando.nakadi.generated.avro",
"type": "record",
"fields": [
{
"name": "foo",
"type": "string"
}
]
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.zalando.nakadi;

import com.google.common.base.Charsets;
import com.google.common.io.CountingInputStream;
import io.opentracing.tag.Tags;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
Expand All @@ -23,6 +24,7 @@
import org.zalando.nakadi.exceptions.runtime.InternalNakadiException;
import org.zalando.nakadi.exceptions.runtime.NoSuchEventTypeException;
import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException;
import org.zalando.nakadi.mapper.NakadiRecordMapper;
import org.zalando.nakadi.metrics.EventTypeMetricRegistry;
import org.zalando.nakadi.metrics.EventTypeMetrics;
import org.zalando.nakadi.security.Client;
Expand All @@ -32,11 +34,11 @@
import org.zalando.nakadi.service.publishing.BinaryEventPublisher;
import org.zalando.nakadi.service.publishing.EventPublisher;
import org.zalando.nakadi.service.publishing.NakadiKpiPublisher;
import org.zalando.nakadi.service.publishing.NakadiRecordMapper;
import org.zalando.nakadi.service.publishing.check.Check;

import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -104,25 +106,22 @@ public ResponseEntity postJsonEvents(@PathVariable final String eventTypeName,
produces = "application/json; charset=utf-8"
)
public ResponseEntity postBinaryEvents(@PathVariable final String eventTypeName,
@RequestBody final byte[] batch,
final HttpServletRequest request,
final Client client)
throws AccessDeniedException, BlockedException, ServiceTemporarilyUnavailableException,
InternalNakadiException, EventTypeTimeoutException, NoSuchEventTypeException {

// TODO: check that event type schema type is AVRO!

//try {
// return postBinaryEvents(eventTypeName, batch, request, client, false);
//} catch (IOException e) {
// throw new InternalNakadiException("failed to parse batch", e);
//}
return status(HttpStatus.NOT_IMPLEMENTED).body("the method is under development");
try {
return postBinaryEvents(eventTypeName, request.getInputStream(), client, false);
} catch (IOException e) {
throw new InternalNakadiException("failed to parse batch", e);
}
}

private ResponseEntity postBinaryEvents(final String eventTypeName,
final byte[] batch,
final HttpServletRequest request,
final InputStream batch,
final Client client,
final boolean delete) throws IOException {
TracingService.setOperationName("publish_events")
Expand All @@ -142,16 +141,14 @@ private ResponseEntity postBinaryEvents(final String eventTypeName,
try {
final long startingNanos = System.nanoTime();
try {
final int totalSizeBytes = batch.length;
TracingService.setTag("slo_bucket", TracingService.getSLOBucketName(totalSizeBytes));

// todo implement delete
// if (delete) {
// result = publisher.delete(eventsAsString, eventTypeName);
// }

final CountingInputStream countingInputStream = new CountingInputStream(batch);
final EventPublishResult result;
final List<NakadiRecord> nakadiRecords = nakadiRecordMapper.fromBytesBatch(batch);
final List<NakadiRecord> nakadiRecords = nakadiRecordMapper.fromBytesBatch(countingInputStream);
final List<NakadiRecordResult> recordResults = binaryPublisher
.publishWithChecks(eventType, nakadiRecords, prePublishingChecks);
if (recordResults.isEmpty()) {
Expand All @@ -162,6 +159,9 @@ private ResponseEntity postBinaryEvents(final String eventTypeName,

final int eventCount = result.getResponses().size();

final long totalSizeBytes = countingInputStream.getCount();
TracingService.setTag("slo_bucket", TracingService.getSLOBucketName(totalSizeBytes));

reportMetrics(eventTypeMetrics, result, totalSizeBytes, eventCount);
reportSLOs(startingNanos, totalSizeBytes, eventCount, result, eventTypeName, client);

Expand Down Expand Up @@ -254,7 +254,7 @@ private ResponseEntity postEventInternal(final String eventTypeName,
}
}

private void reportSLOs(final long startingNanos, final int totalSizeBytes, final int eventCount,
private void reportSLOs(final long startingNanos, final long totalSizeBytes, final int eventCount,
final EventPublishResult eventPublishResult, final String eventTypeName,
final Client client) {
if (eventPublishResult.getStatus() == EventPublishingStatus.SUBMITTED) {
Expand All @@ -273,7 +273,7 @@ private void reportSLOs(final long startingNanos, final int totalSizeBytes, fina
}

private void reportMetrics(final EventTypeMetrics eventTypeMetrics, final EventPublishResult result,
final int totalSizeBytes, final int eventCount) {
final long totalSizeBytes, final int eventCount) {
if (result.getStatus() == EventPublishingStatus.SUBMITTED) {
eventTypeMetrics.reportSizing(eventCount, totalSizeBytes - eventCount - 1);
} else if (result.getStatus() == EventPublishingStatus.FAILED && eventCount != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.context.request.NativeWebRequest;
import org.zalando.nakadi.exceptions.runtime.AvroDecodingException;
import org.zalando.nakadi.exceptions.runtime.EnrichmentException;
import org.zalando.nakadi.exceptions.runtime.EventTypeTimeoutException;
import org.zalando.nakadi.exceptions.runtime.InvalidPartitionKeyFieldsException;
Expand All @@ -31,13 +32,27 @@ public ResponseEntity<Problem> handleEventTypeTimeoutException(final EventTypeTi
@ExceptionHandler(JSONException.class)
public ResponseEntity<Problem> handleJSONException(final JSONException exception,
final NativeWebRequest request) {
return handlePayloadException(exception, "Error occurred when parsing event(s). ", request);
}

@ExceptionHandler(AvroDecodingException.class)
public ResponseEntity<Problem> handleAvroException(final AvroDecodingException exception,
final NativeWebRequest request) {
return handlePayloadException(exception, "Error occurred when parsing avro. ", request);
}

private ResponseEntity<Problem> handlePayloadException(final Exception exception,
final String message,
final NativeWebRequest request) {
if (exception.getCause() == null) {
return create(Problem.valueOf(Status.BAD_REQUEST,
"Error occurred when parsing event(s). " + exception.getMessage()), request);
message + exception.getMessage()), request);
}

return create(Problem.valueOf(Status.BAD_REQUEST), request);
}


@ExceptionHandler({EnrichmentException.class,
PartitioningException.class,
InvalidPartitionKeyFieldsException.class})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.zalando.nakadi.service.publishing.BinaryEventPublisher;
import org.zalando.nakadi.service.publishing.EventPublisher;
import org.zalando.nakadi.service.publishing.NakadiKpiPublisher;
import org.zalando.nakadi.service.publishing.NakadiRecordMapper;
import org.zalando.nakadi.mapper.NakadiRecordMapper;
import org.zalando.nakadi.service.publishing.check.Check;
import org.zalando.nakadi.utils.TestUtils;

Expand Down
2 changes: 1 addition & 1 deletion app/src/main/resources/kpi_event_types.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"ordering_instance_ids": [],
"schema": {
"type": "avro_schema",
"schema": "{ \"name\": \"nakadi.batch.published\", \"type\": \"record\", \"doc\": \"Stores KPI event of type nakadi.batch.published\", \"fields\": [ { \"name\": \"event_type\", \"type\": \"string\" }, { \"name\": \"app\", \"type\": \"string\" }, { \"name\": \"app_hashed\", \"type\": \"string\" }, { \"name\": \"token_realm\", \"type\": \"string\" }, { \"name\": \"number_of_events\", \"type\": \"int\" }, { \"name\": \"ms_spent\", \"type\": \"long\" }, { \"name\": \"batch_size\", \"type\": \"int\" } ]}"
"schema": "{ \"name\": \"nakadi.batch.published\", \"type\": \"record\", \"doc\": \"Stores KPI event of type nakadi.batch.published\", \"fields\": [ { \"name\": \"event_type\", \"type\": \"string\" }, { \"name\": \"app\", \"type\": \"string\" }, { \"name\": \"app_hashed\", \"type\": \"string\" }, { \"name\": \"token_realm\", \"type\": \"string\" }, { \"name\": \"number_of_events\", \"type\": \"int\" }, { \"name\": \"ms_spent\", \"type\": \"long\" }, { \"name\": \"batch_size\", \"type\": \"long\" } ]}"
},
"default_statistic": {
"messages_per_minute": 100,
Expand Down
4 changes: 4 additions & 0 deletions checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
<!DOCTYPE module PUBLIC "-//Puppy Crawl//DTD Check Configuration 1.3//EN" "https://checkstyle.org/dtds/configuration_1_3.dtd">
<module name="Checker">

<module name="BeforeExecutionExclusionFileFilter">
<property name="fileNamePattern" value="/build/*.generated.*"/>
</module>

<module name="LineLength">
<property name="max" value="120"/>
</module>
Expand Down
26 changes: 25 additions & 1 deletion core-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ buildscript {
}
}

plugins {
id "com.github.davidmc24.gradle.plugin.avro" version "1.3.0"
}

apply plugin: 'groovy'
apply plugin: 'eclipse'
apply plugin: 'project-report'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'
apply plugin: 'org.owasp.dependencycheck'
apply plugin: "com.github.davidmc24.gradle.plugin.avro-base"

group 'org.zalando'

Expand Down Expand Up @@ -94,7 +99,7 @@ dependencies {
compile 'org.zalando:problem-spring-web:0.23.0'

// avro
compile "org.apache.avro:avro:1.10.2"
compile "org.apache.avro:avro:1.11.0"
compile "com.fasterxml.jackson.dataformat:jackson-dataformat-avro:2.8.5"

compile 'com.github.luben:zstd-jni:1.5.2-2'
Expand Down Expand Up @@ -139,3 +144,22 @@ bootJar {
jar {
enabled = true
}

import com.github.davidmc24.gradle.plugin.avro.GenerateAvroJavaTask

def generateAvro = tasks.register("generateAvro", GenerateAvroJavaTask) {
source("src/main/resources/avro-schema/batch.publishing")
outputDir = file("build/generated/sources")
}

tasks.named("compileJava").configure {
source(generateAvro)
}

sourceSets {
main {
java {
srcDirs = ["src/main/java", "build/generated/sources"]
}
}
}
2 changes: 1 addition & 1 deletion core-common/settings.gradle
Original file line number Diff line number Diff line change
@@ -1 +1 @@
rootProject.name = 'core-common'
rootProject.name = 'core-common'
Loading

0 comments on commit f8816a5

Please sign in to comment.