From f8816a55d99a28177c751d3744f3c00c579d5ee3 Mon Sep 17 00:00:00 2001 From: Andrey Dyachkov Date: Thu, 16 Jun 2022 14:05:09 +0200 Subject: [PATCH] Avro batch format (#1430) * the commit introduces Avro batch format for Nakadi Publishing API --- .../repository/kafka/KafkaRepositoryAT.java | 5 - .../nakadi/webservice/BinaryEndToEndAT.java | 76 ++++++++-- .../resources/nakadi.end2end.avsc | 11 ++ .../nakadi/EventPublishingController.java | 32 ++-- .../EventPublishingExceptionHandler.java | 17 ++- .../EventPublishingControllerTest.java | 2 +- app/src/main/resources/kpi_event_types.json | 2 +- checkstyle.xml | 4 + core-common/build.gradle | 26 +++- core-common/settings.gradle | 2 +- .../nakadi/domain/NakadiAvroMetadata.java | 91 ----------- .../zalando/nakadi/domain/NakadiMetadata.java | 25 +-- .../zalando/nakadi/domain/NakadiRecord.java | 40 ----- .../nakadi/domain/kpi/AccessLogEvent.java | 2 +- .../domain/kpi/BatchPublishedEvent.java | 8 +- .../nakadi/domain/kpi/DataStreamedEvent.java | 2 +- .../nakadi/domain/kpi/EventTypeLogEvent.java | 2 +- .../domain/kpi/SubscriptionLogEvent.java | 2 +- .../MetadataEnrichmentStrategy.java | 4 +- .../runtime/AvroDecodingException.java | 11 ++ .../runtime/NakadiRuntimeException.java | 5 + .../nakadi/mapper/NakadiRecordMapper.java | 140 +++++++++++++++++ .../nakadi/metrics/EventTypeMetrics.java | 2 +- .../repository/KafkaRepositoryCreator.java | 9 +- .../kafka/KafkaTopicRepository.java | 31 ++-- .../repository/kafka/NakadiKafkaConsumer.java | 6 +- .../repository/kafka/RecordDeserializer.java | 2 +- .../nakadi/service/LocalSchemaRegistry.java | 24 +-- .../batch.publishing/batch.publishing.0.avsc | 136 +++++++++++++++++ .../batch.publishing/batch.publishing.1.avsc | 143 ++++++++++++++++++ .../metadata/metadata.0.avsc | 0 .../metadata/metadata.1.avsc | 0 .../metadata/metadata.2.avsc | 0 .../metadata/metadata.3.avsc | 0 .../metadata/metadata.4.avsc | 0 .../metadata/metadata.5.avsc | 0 .../nakadi.access.log.0.avsc | 0 .../nakadi.access.log.1.avsc | 0 .../nakadi.batch.published.0.avsc | 0 .../nakadi.batch.published.1.avsc | 35 +++++ .../nakadi.data.streamed.0.avsc | 0 .../nakadi.event.type.log.0.avsc | 0 .../nakadi.subscription.log.0.avsc | 0 .../nakadi/domain/kpi/AccessLogEventTest.java | 10 +- .../domain/kpi/BatchPublishedEventTest.java | 5 +- .../domain/kpi/DataStreamedEventTest.java | 10 +- .../domain/kpi/EventTypeLogEventTest.java | 10 +- .../domain/kpi/SubscriptionLogEventTest.java | 10 +- .../nakadi/mapper/NakadiRecordMapperTest.java | 83 ++++++++++ .../kafka/KafkaTopicRepositoryTest.java | 33 ++-- .../org/zalando/nakadi/utils/TestUtils.java | 12 ++ .../AvroDeserializerWithSequenceDecoder.java | 18 +-- .../kafka/KafkaRecordDeserializer.java | 90 ++++++++++- .../publishing/BinaryEventProcessor.java | 2 +- .../publishing/NakadiKpiPublisher.java | 21 +-- .../publishing/NakadiRecordMapper.java | 72 --------- .../timeline/MultiTimelineEventConsumer.java | 9 +- .../service/timeline/TimelineService.java | 10 +- .../MetadataEnrichmentStrategyTest.java | 16 +- .../kafka/KafkaRecordDeserializerTest.java | 124 +++++++++++++-- .../service/CursorOperationsServiceTest.java | 4 - .../publishing/EventOwnerExtractorTest.java | 5 +- .../publishing/EventPublisherTest.java | 15 +- .../publishing/NakadiKpiPublisherTest.java | 16 +- .../publishing/NakadiRecordMapperTest.java | 83 ---------- .../service/publishing/check/CheckTest.java | 6 +- .../publishing/check/EnrichmentCheckTest.java | 4 +- .../service/timeline/TimelineServiceTest.java | 7 +- .../test/resources/test.deserialize.avro.avsc | 10 ++ 69 files changed, 1032 insertions(+), 550 deletions(-) create mode 100644 acceptance-test/src/acceptance-test/resources/nakadi.end2end.avsc delete mode 100644 core-common/src/main/java/org/zalando/nakadi/domain/NakadiAvroMetadata.java create mode 100644 core-common/src/main/java/org/zalando/nakadi/exceptions/runtime/AvroDecodingException.java create mode 100644 core-common/src/main/java/org/zalando/nakadi/mapper/NakadiRecordMapper.java create mode 100644 core-common/src/main/resources/avro-schema/batch.publishing/batch.publishing.0.avsc create mode 100644 core-common/src/main/resources/avro-schema/batch.publishing/batch.publishing.1.avsc rename core-common/src/main/resources/{event-type-schema => avro-schema}/metadata/metadata.0.avsc (100%) rename core-common/src/main/resources/{event-type-schema => avro-schema}/metadata/metadata.1.avsc (100%) rename core-common/src/main/resources/{event-type-schema => avro-schema}/metadata/metadata.2.avsc (100%) rename core-common/src/main/resources/{event-type-schema => avro-schema}/metadata/metadata.3.avsc (100%) rename core-common/src/main/resources/{event-type-schema => avro-schema}/metadata/metadata.4.avsc (100%) rename core-common/src/main/resources/{event-type-schema => avro-schema}/metadata/metadata.5.avsc (100%) rename core-common/src/main/resources/{event-type-schema => avro-schema}/nakadi.access.log/nakadi.access.log.0.avsc (100%) rename core-common/src/main/resources/{event-type-schema => avro-schema}/nakadi.access.log/nakadi.access.log.1.avsc (100%) rename core-common/src/main/resources/{event-type-schema => avro-schema}/nakadi.batch.published/nakadi.batch.published.0.avsc (100%) create mode 100644 core-common/src/main/resources/avro-schema/nakadi.batch.published/nakadi.batch.published.1.avsc rename core-common/src/main/resources/{event-type-schema => avro-schema}/nakadi.data.streamed/nakadi.data.streamed.0.avsc (100%) rename core-common/src/main/resources/{event-type-schema => avro-schema}/nakadi.event.type.log/nakadi.event.type.log.0.avsc (100%) rename core-common/src/main/resources/{event-type-schema => avro-schema}/nakadi.subscription.log/nakadi.subscription.log.0.avsc (100%) create mode 100644 core-common/src/test/java/org/zalando/nakadi/mapper/NakadiRecordMapperTest.java delete mode 100644 core-services/src/main/java/org/zalando/nakadi/service/publishing/NakadiRecordMapper.java delete mode 100644 core-services/src/test/java/org/zalando/nakadi/service/publishing/NakadiRecordMapperTest.java create mode 100644 core-services/src/test/resources/test.deserialize.avro.avsc diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java index b32368db82..aa8f55b7c3 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java @@ -1,6 +1,5 @@ package org.zalando.nakadi.repository.kafka; -import com.codahale.metrics.MetricRegistry; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; @@ -78,7 +77,6 @@ public class KafkaRepositoryAT extends BaseAT { private NakadiSettings nakadiSettings; private KafkaSettings kafkaSettings; - private ZookeeperSettings zookeeperSettings; private KafkaTestHelper kafkaHelper; private KafkaTopicRepository kafkaTopicRepository; private NakadiTopicConfig defaultTopicConfig; @@ -110,7 +108,6 @@ public void setup() { kafkaSettings = new KafkaSettings(KAFKA_RETRIES, KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_BUFFER_MEMORY, KAFKA_LINGER_MS, KAFKA_ENABLE_AUTO_COMMIT, KAFKA_MAX_REQUEST_SIZE, KAFKA_DELIVERY_TIMEOUT, KAFKA_MAX_BLOCK_TIMEOUT, "", KAFKA_COMPRESSION_TYPE); - zookeeperSettings = new ZookeeperSettings(ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, ZK_MAX_IN_FLIGHT_REQUESTS); kafkaHelper = new KafkaTestHelper(KAFKA_URL); defaultTopicConfig = new NakadiTopicConfig(DEFAULT_PARTITION_COUNT, DEFAULT_CLEANUP_POLICY, Optional.of(DEFAULT_RETENTION_TIME)); @@ -290,10 +287,8 @@ private KafkaTopicRepository createKafkaTopicRepository() { .setKafkaFactory(factory) .setNakadiSettings(nakadiSettings) .setKafkaSettings(kafkaSettings) - .setZookeeperSettings(zookeeperSettings) .setKafkaTopicConfigFactory(kafkaTopicConfigFactory) .setKafkaLocationManager(kafkaLocationManager) - .setMetricRegistry(new MetricRegistry()) .build(); } diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/BinaryEndToEndAT.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/BinaryEndToEndAT.java index 9d744ddd33..4cbc321ebc 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/BinaryEndToEndAT.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/BinaryEndToEndAT.java @@ -1,42 +1,94 @@ package org.zalando.nakadi.webservice; -import com.fasterxml.jackson.core.JsonProcessingException; -import org.junit.Ignore; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.io.EncoderFactory; +import org.junit.Assert; import org.junit.Test; +import org.springframework.core.io.DefaultResourceLoader; import org.zalando.nakadi.domain.EnrichmentStrategyDescriptor; import org.zalando.nakadi.domain.EventCategory; +import org.zalando.nakadi.domain.EventTypeSchema; +import org.zalando.nakadi.domain.EventTypeSchemaBase; +import org.zalando.nakadi.domain.Subscription; +import org.zalando.nakadi.generated.avro.Envelope; +import org.zalando.nakadi.generated.avro.Metadata; +import org.zalando.nakadi.generated.avro.PublishingBatch; import org.zalando.nakadi.utils.EventTypeTestBuilder; +import org.zalando.nakadi.utils.RandomSubscriptionBuilder; +import org.zalando.nakadi.utils.TestUtils; import org.zalando.nakadi.webservice.utils.NakadiTestUtils; +import org.zalando.nakadi.webservice.utils.TestStreamingClient; -import java.util.Base64; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Instant; import java.util.List; +import java.util.Map; import static com.jayway.restassured.RestAssured.given; +import static org.zalando.nakadi.domain.SubscriptionBase.InitialPosition.BEGIN; +import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription; public class BinaryEndToEndAT extends BaseAT { private static final String TEST_ET_NAME = "nakadi.test-2022-05-06.et"; - private static final String TEST_DATA = "BAAAAGO0v/qJk2BIZjU5ZmVlNTQtMmNhYy00MTAzLWI4NTItOGMwOGRiZjhlNjEyAhJ0ZX" + - "N0LWZsb3cAAjECEnRlc3QtdXNlcjJuYWthZGkudGVzdC0yMDIyLTA1LTA2LmV0AAAAAAAAAABBCFBPU1QYL2V2ZW50LXR5cGVzAB50ZX" + - "N0LXVzZXItYWdlbnQMbmFrYWRpFGhhc2hlZC1hcHCSAxQCLQQtLfYBggU="; @Test - @Ignore - public void testAvroPublishing() throws JsonProcessingException { + public void testAvroPublishingAndJsonConsumption() throws IOException { + final Schema schema = new Schema.Parser().parse(new DefaultResourceLoader() + .getResource("nakadi.end2end.avsc").getInputStream()); final var et = EventTypeTestBuilder.builder() .name(TEST_ET_NAME) .category(EventCategory.BUSINESS) .enrichmentStrategies(List.of(EnrichmentStrategyDescriptor.METADATA_ENRICHMENT)) + .schema(new EventTypeSchema(new EventTypeSchemaBase( + EventTypeSchemaBase.Type.AVRO_SCHEMA, + schema.toString()), "1.0.0", TestUtils.randomDate())) .build(); NakadiTestUtils.createEventTypeInNakadi(et); - final byte[] body = Base64.getDecoder().decode(TEST_DATA); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + new GenericDatumWriter(schema).write( + new GenericRecordBuilder(schema).set("foo", "bar").build(), + EncoderFactory.get().directBinaryEncoder(baos, null)); + final PublishingBatch batch = PublishingBatch.newBuilder() + .setEvents(List.of(Envelope.newBuilder() + .setMetadata(Metadata.newBuilder() + .setEventType(TEST_ET_NAME) + .setVersion("1.0.0") + .setOccurredAt(Instant.now()) + .setEid("CE8C9EBC-3F19-4B9D-A453-08AD2EDA6028") + .build()) + .setPayload(ByteBuffer.wrap(baos.toByteArray())) + .build())) + .build(); + + final ByteBuffer body = PublishingBatch.getEncoder().encode(batch); final var response = given() - .contentType("application/avro-binary; charset=utf-8") - .body(body) + .contentType("application/avro-binary") + .body(body.array()) .post(String.format("/event-types/%s/events", TEST_ET_NAME)); response.print(); response.then().statusCode(200); - // TODO add the consumption side once schema creation is done. + + // check event is consumed and format is correct + final Subscription subscription = createSubscription( + RandomSubscriptionBuilder.builder() + .withEventType(TEST_ET_NAME) + .withStartFrom(BEGIN) + .buildSubscriptionBase()); + final TestStreamingClient client = TestStreamingClient.create(subscription.getId()).start(); + + TestUtils.waitFor(() -> Assert.assertEquals(1, client.getBatches().size())); + final Map event = client.getBatches().get(0).getEvents().get(0); + Assert.assertEquals("bar", event.get("foo")); + + final Map metadata = (Map) event.get("metadata"); + Assert.assertEquals("CE8C9EBC-3F19-4B9D-A453-08AD2EDA6028", metadata.get("eid")); + Assert.assertEquals("1.0.0", metadata.get("version")); + Assert.assertEquals(TEST_ET_NAME, metadata.get("event_type")); } } diff --git a/acceptance-test/src/acceptance-test/resources/nakadi.end2end.avsc b/acceptance-test/src/acceptance-test/resources/nakadi.end2end.avsc new file mode 100644 index 0000000000..d510cc3b71 --- /dev/null +++ b/acceptance-test/src/acceptance-test/resources/nakadi.end2end.avsc @@ -0,0 +1,11 @@ +{ + "name": "TestEnd2End", + "namespace": "org.zalando.nakadi.generated.avro", + "type": "record", + "fields": [ + { + "name": "foo", + "type": "string" + } + ] +} \ No newline at end of file diff --git a/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingController.java b/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingController.java index 5c8e7edb11..d65dab4997 100644 --- a/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingController.java +++ b/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingController.java @@ -1,6 +1,7 @@ package org.zalando.nakadi; import com.google.common.base.Charsets; +import com.google.common.io.CountingInputStream; import io.opentracing.tag.Tags; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -23,6 +24,7 @@ import org.zalando.nakadi.exceptions.runtime.InternalNakadiException; import org.zalando.nakadi.exceptions.runtime.NoSuchEventTypeException; import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; +import org.zalando.nakadi.mapper.NakadiRecordMapper; import org.zalando.nakadi.metrics.EventTypeMetricRegistry; import org.zalando.nakadi.metrics.EventTypeMetrics; import org.zalando.nakadi.security.Client; @@ -32,11 +34,11 @@ import org.zalando.nakadi.service.publishing.BinaryEventPublisher; import org.zalando.nakadi.service.publishing.EventPublisher; import org.zalando.nakadi.service.publishing.NakadiKpiPublisher; -import org.zalando.nakadi.service.publishing.NakadiRecordMapper; import org.zalando.nakadi.service.publishing.check.Check; import javax.servlet.http.HttpServletRequest; import java.io.IOException; +import java.io.InputStream; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -104,7 +106,6 @@ public ResponseEntity postJsonEvents(@PathVariable final String eventTypeName, produces = "application/json; charset=utf-8" ) public ResponseEntity postBinaryEvents(@PathVariable final String eventTypeName, - @RequestBody final byte[] batch, final HttpServletRequest request, final Client client) throws AccessDeniedException, BlockedException, ServiceTemporarilyUnavailableException, @@ -112,17 +113,15 @@ public ResponseEntity postBinaryEvents(@PathVariable final String eventTypeName, // TODO: check that event type schema type is AVRO! - //try { - // return postBinaryEvents(eventTypeName, batch, request, client, false); - //} catch (IOException e) { - // throw new InternalNakadiException("failed to parse batch", e); - //} - return status(HttpStatus.NOT_IMPLEMENTED).body("the method is under development"); + try { + return postBinaryEvents(eventTypeName, request.getInputStream(), client, false); + } catch (IOException e) { + throw new InternalNakadiException("failed to parse batch", e); + } } private ResponseEntity postBinaryEvents(final String eventTypeName, - final byte[] batch, - final HttpServletRequest request, + final InputStream batch, final Client client, final boolean delete) throws IOException { TracingService.setOperationName("publish_events") @@ -142,16 +141,14 @@ private ResponseEntity postBinaryEvents(final String eventTypeName, try { final long startingNanos = System.nanoTime(); try { - final int totalSizeBytes = batch.length; - TracingService.setTag("slo_bucket", TracingService.getSLOBucketName(totalSizeBytes)); - // todo implement delete // if (delete) { // result = publisher.delete(eventsAsString, eventTypeName); // } + final CountingInputStream countingInputStream = new CountingInputStream(batch); final EventPublishResult result; - final List nakadiRecords = nakadiRecordMapper.fromBytesBatch(batch); + final List nakadiRecords = nakadiRecordMapper.fromBytesBatch(countingInputStream); final List recordResults = binaryPublisher .publishWithChecks(eventType, nakadiRecords, prePublishingChecks); if (recordResults.isEmpty()) { @@ -162,6 +159,9 @@ private ResponseEntity postBinaryEvents(final String eventTypeName, final int eventCount = result.getResponses().size(); + final long totalSizeBytes = countingInputStream.getCount(); + TracingService.setTag("slo_bucket", TracingService.getSLOBucketName(totalSizeBytes)); + reportMetrics(eventTypeMetrics, result, totalSizeBytes, eventCount); reportSLOs(startingNanos, totalSizeBytes, eventCount, result, eventTypeName, client); @@ -254,7 +254,7 @@ private ResponseEntity postEventInternal(final String eventTypeName, } } - private void reportSLOs(final long startingNanos, final int totalSizeBytes, final int eventCount, + private void reportSLOs(final long startingNanos, final long totalSizeBytes, final int eventCount, final EventPublishResult eventPublishResult, final String eventTypeName, final Client client) { if (eventPublishResult.getStatus() == EventPublishingStatus.SUBMITTED) { @@ -273,7 +273,7 @@ private void reportSLOs(final long startingNanos, final int totalSizeBytes, fina } private void reportMetrics(final EventTypeMetrics eventTypeMetrics, final EventPublishResult result, - final int totalSizeBytes, final int eventCount) { + final long totalSizeBytes, final int eventCount) { if (result.getStatus() == EventPublishingStatus.SUBMITTED) { eventTypeMetrics.reportSizing(eventCount, totalSizeBytes - eventCount - 1); } else if (result.getStatus() == EventPublishingStatus.FAILED && eventCount != 0) { diff --git a/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingExceptionHandler.java b/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingExceptionHandler.java index 866651401d..b9fdc1b4c4 100644 --- a/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingExceptionHandler.java +++ b/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingExceptionHandler.java @@ -5,6 +5,7 @@ import org.springframework.web.bind.annotation.ControllerAdvice; import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.context.request.NativeWebRequest; +import org.zalando.nakadi.exceptions.runtime.AvroDecodingException; import org.zalando.nakadi.exceptions.runtime.EnrichmentException; import org.zalando.nakadi.exceptions.runtime.EventTypeTimeoutException; import org.zalando.nakadi.exceptions.runtime.InvalidPartitionKeyFieldsException; @@ -31,13 +32,27 @@ public ResponseEntity handleEventTypeTimeoutException(final EventTypeTi @ExceptionHandler(JSONException.class) public ResponseEntity handleJSONException(final JSONException exception, final NativeWebRequest request) { + return handlePayloadException(exception, "Error occurred when parsing event(s). ", request); + } + + @ExceptionHandler(AvroDecodingException.class) + public ResponseEntity handleAvroException(final AvroDecodingException exception, + final NativeWebRequest request) { + return handlePayloadException(exception, "Error occurred when parsing avro. ", request); + } + + private ResponseEntity handlePayloadException(final Exception exception, + final String message, + final NativeWebRequest request) { if (exception.getCause() == null) { return create(Problem.valueOf(Status.BAD_REQUEST, - "Error occurred when parsing event(s). " + exception.getMessage()), request); + message + exception.getMessage()), request); } + return create(Problem.valueOf(Status.BAD_REQUEST), request); } + @ExceptionHandler({EnrichmentException.class, PartitioningException.class, InvalidPartitionKeyFieldsException.class}) diff --git a/api-publishing/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java b/api-publishing/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java index 9490943421..492e6840f9 100644 --- a/api-publishing/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java +++ b/api-publishing/src/test/java/org/zalando/nakadi/controller/EventPublishingControllerTest.java @@ -37,7 +37,7 @@ import org.zalando.nakadi.service.publishing.BinaryEventPublisher; import org.zalando.nakadi.service.publishing.EventPublisher; import org.zalando.nakadi.service.publishing.NakadiKpiPublisher; -import org.zalando.nakadi.service.publishing.NakadiRecordMapper; +import org.zalando.nakadi.mapper.NakadiRecordMapper; import org.zalando.nakadi.service.publishing.check.Check; import org.zalando.nakadi.utils.TestUtils; diff --git a/app/src/main/resources/kpi_event_types.json b/app/src/main/resources/kpi_event_types.json index dbfec43d1e..0474d85773 100644 --- a/app/src/main/resources/kpi_event_types.json +++ b/app/src/main/resources/kpi_event_types.json @@ -13,7 +13,7 @@ "ordering_instance_ids": [], "schema": { "type": "avro_schema", - "schema": "{ \"name\": \"nakadi.batch.published\", \"type\": \"record\", \"doc\": \"Stores KPI event of type nakadi.batch.published\", \"fields\": [ { \"name\": \"event_type\", \"type\": \"string\" }, { \"name\": \"app\", \"type\": \"string\" }, { \"name\": \"app_hashed\", \"type\": \"string\" }, { \"name\": \"token_realm\", \"type\": \"string\" }, { \"name\": \"number_of_events\", \"type\": \"int\" }, { \"name\": \"ms_spent\", \"type\": \"long\" }, { \"name\": \"batch_size\", \"type\": \"int\" } ]}" + "schema": "{ \"name\": \"nakadi.batch.published\", \"type\": \"record\", \"doc\": \"Stores KPI event of type nakadi.batch.published\", \"fields\": [ { \"name\": \"event_type\", \"type\": \"string\" }, { \"name\": \"app\", \"type\": \"string\" }, { \"name\": \"app_hashed\", \"type\": \"string\" }, { \"name\": \"token_realm\", \"type\": \"string\" }, { \"name\": \"number_of_events\", \"type\": \"int\" }, { \"name\": \"ms_spent\", \"type\": \"long\" }, { \"name\": \"batch_size\", \"type\": \"long\" } ]}" }, "default_statistic": { "messages_per_minute": 100, diff --git a/checkstyle.xml b/checkstyle.xml index 38abf19cf1..ce250a60f2 100644 --- a/checkstyle.xml +++ b/checkstyle.xml @@ -2,6 +2,10 @@ + + + + diff --git a/core-common/build.gradle b/core-common/build.gradle index f0c9eb26d4..bd8bd6e8fe 100644 --- a/core-common/build.gradle +++ b/core-common/build.gradle @@ -6,12 +6,17 @@ buildscript { } } +plugins { + id "com.github.davidmc24.gradle.plugin.avro" version "1.3.0" +} + apply plugin: 'groovy' apply plugin: 'eclipse' apply plugin: 'project-report' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management' apply plugin: 'org.owasp.dependencycheck' +apply plugin: "com.github.davidmc24.gradle.plugin.avro-base" group 'org.zalando' @@ -94,7 +99,7 @@ dependencies { compile 'org.zalando:problem-spring-web:0.23.0' // avro - compile "org.apache.avro:avro:1.10.2" + compile "org.apache.avro:avro:1.11.0" compile "com.fasterxml.jackson.dataformat:jackson-dataformat-avro:2.8.5" compile 'com.github.luben:zstd-jni:1.5.2-2' @@ -139,3 +144,22 @@ bootJar { jar { enabled = true } + +import com.github.davidmc24.gradle.plugin.avro.GenerateAvroJavaTask + +def generateAvro = tasks.register("generateAvro", GenerateAvroJavaTask) { + source("src/main/resources/avro-schema/batch.publishing") + outputDir = file("build/generated/sources") +} + +tasks.named("compileJava").configure { + source(generateAvro) +} + +sourceSets { + main { + java { + srcDirs = ["src/main/java", "build/generated/sources"] + } + } +} diff --git a/core-common/settings.gradle b/core-common/settings.gradle index 811c771a56..2d5786eaf9 100644 --- a/core-common/settings.gradle +++ b/core-common/settings.gradle @@ -1 +1 @@ -rootProject.name = 'core-common' +rootProject.name = 'core-common' \ No newline at end of file diff --git a/core-common/src/main/java/org/zalando/nakadi/domain/NakadiAvroMetadata.java b/core-common/src/main/java/org/zalando/nakadi/domain/NakadiAvroMetadata.java deleted file mode 100644 index 41f459d69a..0000000000 --- a/core-common/src/main/java/org/zalando/nakadi/domain/NakadiAvroMetadata.java +++ /dev/null @@ -1,91 +0,0 @@ -package org.zalando.nakadi.domain; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.GenericRecordBuilder; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.EncoderFactory; -import org.zalando.nakadi.util.GenericRecordWrapper; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.OutputStream; - -public class NakadiAvroMetadata extends NakadiMetadata { - public static final String EID = "eid"; - public static final String OCCURRED_AT = "occurred_at"; - public static final String PUBLISHED_BY = "published_by"; - public static final String RECEIVED_AT = "received_at"; - public static final String EVENT_TYPE = "event_type"; - public static final String FLOW_ID = "flow_id"; - public static final String SCHEMA_VERSION = "version"; - public static final String PARTITION = "partition"; - public static final String PARTITION_KEYS = "partition_keys"; - public static final String PARTITION_COMPACTION_KEY = "partition_compaction_key"; - public static final String PARENT_EIDS = "parent_eids"; - public static final String SPAN_CTX = "span_ctx"; - - private final Schema metadataAvroSchema; - - public NakadiAvroMetadata( - final byte metadataVersion, - final Schema metadataAvroSchema) { - super(metadataVersion); - this.metadataAvroSchema = metadataAvroSchema; - } - - public NakadiAvroMetadata( - final byte metadataVersion, - final Schema metadataAvroSchema, - final byte[] data) throws IOException { - - super(metadataVersion); - this.metadataAvroSchema = metadataAvroSchema; - - final GenericDatumReader datumReader = new GenericDatumReader(metadataAvroSchema); - final GenericRecord genericRecord = (GenericRecord) datumReader.read(null, - DecoderFactory.get().directBinaryDecoder( - new ByteArrayInputStream(data), null)); - final GenericRecordWrapper wrapper = new GenericRecordWrapper(genericRecord); - - this.setEid(wrapper.getString(EID)); - this.setEventType(wrapper.getString(EVENT_TYPE)); - this.setPartition(wrapper.getString(PARTITION)); - this.setOccurredAt(wrapper.getLong(OCCURRED_AT)); - this.setPublishedBy(wrapper.getString(PUBLISHED_BY)); - this.setReceivedAt(wrapper.getLong(RECEIVED_AT)); - this.setFlowId(wrapper.getString(FLOW_ID)); - this.setSchemaVersion(wrapper.getString(SCHEMA_VERSION)); - this.setPartitionKeys(wrapper.getListOfStrings(PARTITION_KEYS)); - this.setPartitionCompactionKey(wrapper.getString(PARTITION_COMPACTION_KEY)); - this.setParentEids(wrapper.getListOfStrings(PARENT_EIDS)); - this.setSpanCtx(wrapper.getString(SPAN_CTX)); - } - - public Schema getMetadataAvroSchema() { - return metadataAvroSchema; - } - - @Override - public void write(final OutputStream outputStream) throws IOException { - final var metadata = new GenericRecordBuilder(metadataAvroSchema) - .set(EID, getEid()) - .set(EVENT_TYPE, getEventType()) - .set(PARTITION, getPartition()) - .set(OCCURRED_AT, getOccurredAt()) - .set(PUBLISHED_BY, getPublishedBy()) - .set(RECEIVED_AT, getReceivedAt()) - .set(FLOW_ID, getFlowId()) - .set(SCHEMA_VERSION, getSchemaVersion()) - .set(PARTITION_KEYS, getPartitionKeys()) - .set(PARTITION_COMPACTION_KEY, getPartitionCompactionKey()) - .set(PARENT_EIDS, getParentEids()) - .set(SPAN_CTX, getSpanCtx()) - .build(); - final GenericDatumWriter eventWriter = new GenericDatumWriter(metadataAvroSchema); - eventWriter.write(metadata, EncoderFactory.get() - .directBinaryEncoder(outputStream, null)); - } -} diff --git a/core-common/src/main/java/org/zalando/nakadi/domain/NakadiMetadata.java b/core-common/src/main/java/org/zalando/nakadi/domain/NakadiMetadata.java index ae7ebe588a..e12b20e561 100644 --- a/core-common/src/main/java/org/zalando/nakadi/domain/NakadiMetadata.java +++ b/core-common/src/main/java/org/zalando/nakadi/domain/NakadiMetadata.java @@ -1,17 +1,16 @@ package org.zalando.nakadi.domain; +import java.time.Instant; import java.util.List; -public abstract class NakadiMetadata implements EnvelopeHolder.EventWriter { - - private final byte metadataVersion; +public class NakadiMetadata { private String eid; - private Long occurredAt; + private Instant occurredAt; private String eventType; private String partition; private String publishedBy; - private Long receivedAt; + private Instant receivedAt; private String flowId; private String spanCtx; private String schemaVersion; @@ -20,14 +19,6 @@ public abstract class NakadiMetadata implements EnvelopeHolder.EventWriter { private String partitionCompactionKey; private String eventOwner; - public NakadiMetadata(final byte metadataVersion) { - this.metadataVersion = metadataVersion; - } - - public byte getMetadataVersion() { - return metadataVersion; - } - public String getEid() { return eid; } @@ -37,11 +28,11 @@ public NakadiMetadata setEid(final String eid) { return this; } - public Long getOccurredAt() { + public Instant getOccurredAt() { return occurredAt; } - public NakadiMetadata setOccurredAt(final Long occurredAt) { + public NakadiMetadata setOccurredAt(final Instant occurredAt) { this.occurredAt = occurredAt; return this; } @@ -73,11 +64,11 @@ public NakadiMetadata setPublishedBy(final String publishedBy) { return this; } - public Long getReceivedAt() { + public Instant getReceivedAt() { return receivedAt; } - public NakadiMetadata setReceivedAt(final Long receivedAt) { + public NakadiMetadata setReceivedAt(final Instant receivedAt) { this.receivedAt = receivedAt; return this; } diff --git a/core-common/src/main/java/org/zalando/nakadi/domain/NakadiRecord.java b/core-common/src/main/java/org/zalando/nakadi/domain/NakadiRecord.java index 60bfed3f32..34b6f9aa8d 100644 --- a/core-common/src/main/java/org/zalando/nakadi/domain/NakadiRecord.java +++ b/core-common/src/main/java/org/zalando/nakadi/domain/NakadiRecord.java @@ -1,30 +1,9 @@ package org.zalando.nakadi.domain; -import org.apache.kafka.clients.producer.ProducerRecord; - -import java.io.IOException; - public class NakadiRecord { - public static final String HEADER_FORMAT = new String(new byte[]{0}); - - public enum Format { - AVRO(new byte[]{0}); - - private final byte[] format; - - Format(final byte[] format) { - this.format = format; - } - - public byte[] getFormat() { - return this.format; - } - } - private byte[] eventKey; private byte[] payload; - private byte[] format; private EventOwnerHeader owner; private NakadiMetadata metadata; @@ -46,15 +25,6 @@ public NakadiRecord setPayload(final byte[] payload) { return this; } - public byte[] getFormat() { - return format; - } - - public NakadiRecord setFormat(final byte[] format) { - this.format = format; - return this; - } - public EventOwnerHeader getOwner() { return owner; } @@ -73,14 +43,4 @@ public NakadiRecord setMetadata(final NakadiMetadata metadata) { return this; } - public ProducerRecord toProducerRecord(final String topic) throws IOException { - - final var partition = metadata.getPartition(); - final var partitionInt = (partition != null) ? Integer.valueOf(partition) : null; - - final var eventData = EnvelopeHolder.produceBytes( - metadata.getMetadataVersion(), metadata, os -> os.write(payload)); - - return new ProducerRecord<>(topic, partitionInt, eventKey, eventData); - } } diff --git a/core-common/src/main/java/org/zalando/nakadi/domain/kpi/AccessLogEvent.java b/core-common/src/main/java/org/zalando/nakadi/domain/kpi/AccessLogEvent.java index bdadab5bca..dd25822bfe 100644 --- a/core-common/src/main/java/org/zalando/nakadi/domain/kpi/AccessLogEvent.java +++ b/core-common/src/main/java/org/zalando/nakadi/domain/kpi/AccessLogEvent.java @@ -5,7 +5,7 @@ public class AccessLogEvent extends KPIEvent { - private static final String PATH_SCHEMA = "event-type-schema/nakadi.access.log/nakadi.access.log.1.avsc"; + private static final String PATH_SCHEMA = "avro-schema/nakadi.access.log/nakadi.access.log.1.avsc"; private static final Schema SCHEMA = loadSchema(PATH_SCHEMA); @KPIField("method") diff --git a/core-common/src/main/java/org/zalando/nakadi/domain/kpi/BatchPublishedEvent.java b/core-common/src/main/java/org/zalando/nakadi/domain/kpi/BatchPublishedEvent.java index bfb81ae5e3..f8ac4f0e70 100644 --- a/core-common/src/main/java/org/zalando/nakadi/domain/kpi/BatchPublishedEvent.java +++ b/core-common/src/main/java/org/zalando/nakadi/domain/kpi/BatchPublishedEvent.java @@ -5,7 +5,7 @@ public class BatchPublishedEvent extends KPIEvent { - private static final String PATH_SCHEMA = "event-type-schema/nakadi.batch.published/nakadi.batch.published.0.avsc"; + private static final String PATH_SCHEMA = "avro-schema/nakadi.batch.published/nakadi.batch.published.1.avsc"; private static final Schema SCHEMA = loadSchema(PATH_SCHEMA); @KPIField("event_type") @@ -21,7 +21,7 @@ public class BatchPublishedEvent extends KPIEvent { @KPIField("ms_spent") private long msSpent; @KPIField("batch_size") - private int totalSizeBytes; + private long totalSizeBytes; public BatchPublishedEvent() { super(KPIEventTypes.BATCH_PUBLISHED); @@ -81,11 +81,11 @@ public BatchPublishedEvent setMsSpent(final long msSpent) { return this; } - public int getTotalSizeBytes() { + public long getTotalSizeBytes() { return totalSizeBytes; } - public BatchPublishedEvent setTotalSizeBytes(final int totalSizeBytes) { + public BatchPublishedEvent setTotalSizeBytes(final long totalSizeBytes) { this.totalSizeBytes = totalSizeBytes; return this; } diff --git a/core-common/src/main/java/org/zalando/nakadi/domain/kpi/DataStreamedEvent.java b/core-common/src/main/java/org/zalando/nakadi/domain/kpi/DataStreamedEvent.java index dae74e5375..848389b17f 100644 --- a/core-common/src/main/java/org/zalando/nakadi/domain/kpi/DataStreamedEvent.java +++ b/core-common/src/main/java/org/zalando/nakadi/domain/kpi/DataStreamedEvent.java @@ -5,7 +5,7 @@ public class DataStreamedEvent extends KPIEvent { - private static final String PATH_SCHEMA = "event-type-schema/nakadi.data.streamed/nakadi.data.streamed.0.avsc"; + private static final String PATH_SCHEMA = "avro-schema/nakadi.data.streamed/nakadi.data.streamed.0.avsc"; private static final Schema SCHEMA = loadSchema(PATH_SCHEMA); @KPIField("event_type") diff --git a/core-common/src/main/java/org/zalando/nakadi/domain/kpi/EventTypeLogEvent.java b/core-common/src/main/java/org/zalando/nakadi/domain/kpi/EventTypeLogEvent.java index d96d06a799..44c60b9ce4 100644 --- a/core-common/src/main/java/org/zalando/nakadi/domain/kpi/EventTypeLogEvent.java +++ b/core-common/src/main/java/org/zalando/nakadi/domain/kpi/EventTypeLogEvent.java @@ -5,7 +5,7 @@ public class EventTypeLogEvent extends KPIEvent { - private static final String PATH_SCHEMA = "event-type-schema/nakadi.event.type.log/nakadi.event.type.log.0.avsc"; + private static final String PATH_SCHEMA = "avro-schema/nakadi.event.type.log/nakadi.event.type.log.0.avsc"; private static final Schema SCHEMA = loadSchema(PATH_SCHEMA); @KPIField("event_type") diff --git a/core-common/src/main/java/org/zalando/nakadi/domain/kpi/SubscriptionLogEvent.java b/core-common/src/main/java/org/zalando/nakadi/domain/kpi/SubscriptionLogEvent.java index c6dba1f557..1610f707fa 100644 --- a/core-common/src/main/java/org/zalando/nakadi/domain/kpi/SubscriptionLogEvent.java +++ b/core-common/src/main/java/org/zalando/nakadi/domain/kpi/SubscriptionLogEvent.java @@ -6,7 +6,7 @@ public class SubscriptionLogEvent extends KPIEvent { private static final String PATH_SCHEMA = - "event-type-schema/nakadi.subscription.log/nakadi.subscription.log.0.avsc"; + "avro-schema/nakadi.subscription.log/nakadi.subscription.log.0.avsc"; private static final Schema SCHEMA = loadSchema(PATH_SCHEMA); @KPIField("subscription_id") diff --git a/core-common/src/main/java/org/zalando/nakadi/enrichment/MetadataEnrichmentStrategy.java b/core-common/src/main/java/org/zalando/nakadi/enrichment/MetadataEnrichmentStrategy.java index 0138393ff3..cb1074fc3b 100644 --- a/core-common/src/main/java/org/zalando/nakadi/enrichment/MetadataEnrichmentStrategy.java +++ b/core-common/src/main/java/org/zalando/nakadi/enrichment/MetadataEnrichmentStrategy.java @@ -15,6 +15,7 @@ import org.zalando.nakadi.plugin.api.authz.Subject; import org.zalando.nakadi.util.FlowIdUtils; +import java.time.Instant; import java.util.Optional; public class MetadataEnrichmentStrategy implements EnrichmentStrategy { @@ -49,8 +50,7 @@ public void enrich(final BatchItem batchItem, final EventType eventType) public void enrich(final NakadiRecord nakadiRecord, final EventType eventType) throws EnrichmentException { final var metadata = nakadiRecord.getMetadata(); metadata.setPublishedBy(getPublisher()); - final DateTime dateTime = new DateTime(DateTimeZone.UTC); - metadata.setReceivedAt(dateTime.getMillis()); + metadata.setReceivedAt(Instant.now()); if (metadata.getFlowId() == null || metadata.getFlowId().isEmpty()) { metadata.setFlowId(FlowIdUtils.peek()); } diff --git a/core-common/src/main/java/org/zalando/nakadi/exceptions/runtime/AvroDecodingException.java b/core-common/src/main/java/org/zalando/nakadi/exceptions/runtime/AvroDecodingException.java new file mode 100644 index 0000000000..43aff1bc60 --- /dev/null +++ b/core-common/src/main/java/org/zalando/nakadi/exceptions/runtime/AvroDecodingException.java @@ -0,0 +1,11 @@ +package org.zalando.nakadi.exceptions.runtime; + +import org.apache.avro.AvroRuntimeException; + +public class AvroDecodingException extends NakadiBaseException { + public AvroDecodingException( + final String message, + final AvroRuntimeException exception) { + super(message, exception); + } +} diff --git a/core-common/src/main/java/org/zalando/nakadi/exceptions/runtime/NakadiRuntimeException.java b/core-common/src/main/java/org/zalando/nakadi/exceptions/runtime/NakadiRuntimeException.java index 20313b178e..3124f04ae2 100644 --- a/core-common/src/main/java/org/zalando/nakadi/exceptions/runtime/NakadiRuntimeException.java +++ b/core-common/src/main/java/org/zalando/nakadi/exceptions/runtime/NakadiRuntimeException.java @@ -5,6 +5,11 @@ public class NakadiRuntimeException extends RuntimeException { private final Exception exception; + public NakadiRuntimeException(final String msg, final Exception exception) { + super(msg, exception); + this.exception = exception; + } + public NakadiRuntimeException(final Exception exception) { super(exception); this.exception = exception; diff --git a/core-common/src/main/java/org/zalando/nakadi/mapper/NakadiRecordMapper.java b/core-common/src/main/java/org/zalando/nakadi/mapper/NakadiRecordMapper.java new file mode 100644 index 0000000000..8a84f79d9b --- /dev/null +++ b/core-common/src/main/java/org/zalando/nakadi/mapper/NakadiRecordMapper.java @@ -0,0 +1,140 @@ +package org.zalando.nakadi.mapper; + +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.EncoderFactory; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.stereotype.Service; +import org.zalando.nakadi.domain.NakadiMetadata; +import org.zalando.nakadi.domain.NakadiRecord; +import org.zalando.nakadi.exceptions.runtime.AvroDecodingException; +import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; +import org.zalando.nakadi.generated.avro.Envelope; +import org.zalando.nakadi.generated.avro.Metadata; +import org.zalando.nakadi.generated.avro.PublishingBatch; +import org.zalando.nakadi.service.LocalSchemaRegistry; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; + +@Service +public class NakadiRecordMapper { + + public static final byte[] AVRO_FORMAT = new byte[]{(byte) 0x0}; + public static final String HEADER_FORMAT = new String(AVRO_FORMAT); + + public NakadiRecordMapper(final LocalSchemaRegistry localSchemaRegistry) { + localSchemaRegistry.getEventTypeSchemaVersions(LocalSchemaRegistry.BATCH_PUBLISHING_KEY) + .entrySet().forEach(entry -> { + PublishingBatch.getDecoder().addSchema(entry.getValue()); + Envelope.getDecoder().addSchema(entry.getValue() + .getField("events").schema().getElementType()); + } + ); + } + + public List fromBytesBatch(final InputStream batch) { + final PublishingBatch publishingBatch; + try { + publishingBatch = PublishingBatch.getDecoder() + .decode(batch, new PublishingBatch()); + } catch (AvroRuntimeException are) { + throw new AvroDecodingException("failed to decode publishing batch", are); + } catch (IOException e) { + throw new NakadiRuntimeException("failed to decode publishing batch", e); + } + + final List records = new LinkedList<>(); + for (final Envelope envelope : publishingBatch.getEvents()) { + records.add(new NakadiRecord() + .setMetadata(mapToNakadiMetadata(envelope.getMetadata())) + .setPayload(envelope.getPayload().array())); + } + + return records; + } + + public Envelope fromBytesEnvelope(final InputStream data) { + try { + return Envelope.getDecoder().decode(data, new Envelope()); + } catch (AvroRuntimeException are) { + throw new AvroDecodingException("failed to decode envelope", are); + } catch (IOException io) { + throw new NakadiRuntimeException("failed to decode envelope", io); + } + } + + public NakadiMetadata mapToNakadiMetadata(final Metadata metadata) { + final NakadiMetadata nakadiMetadata = new NakadiMetadata(); + nakadiMetadata.setEid(metadata.getEid()); + nakadiMetadata.setEventType(metadata.getEventType()); + nakadiMetadata.setEventOwner(metadata.getEventOwner()); + nakadiMetadata.setFlowId(metadata.getFlowId()); + nakadiMetadata.setOccurredAt(metadata.getOccurredAt()); + nakadiMetadata.setPartition(metadata.getPartition()); + nakadiMetadata.setParentEids(metadata.getParentEids()); + nakadiMetadata.setPartitionCompactionKey(metadata.getPartitionCompactionKey()); + nakadiMetadata.setPartitionKeys(metadata.getPartitionKeys()); + nakadiMetadata.setPublishedBy(metadata.getPublishedBy()); + nakadiMetadata.setReceivedAt(metadata.getReceivedAt()); + nakadiMetadata.setSchemaVersion(metadata.getVersion()); + nakadiMetadata.setSpanCtx(metadata.getSpanCtx()); + + return nakadiMetadata; + } + + public ProducerRecord mapToProducerRecord( + final NakadiRecord nakadiRecord, + final String topic) throws IOException { + final var partition = nakadiRecord.getMetadata().getPartition(); + final var partitionInt = (partition != null) ? Integer.valueOf(partition) : null; + + final Envelope env = mapToEnvelope(nakadiRecord); + final ByteBuffer byteBuffer = Envelope.getEncoder().encode(env); + + return new ProducerRecord<>(topic, partitionInt, nakadiRecord.getEventKey(), byteBuffer.array()); + } + + private Envelope mapToEnvelope(final NakadiRecord nakadiRecord) { + final NakadiMetadata nakadiMetadata = nakadiRecord.getMetadata(); + final Metadata metadata = Metadata.newBuilder() + .setEid(nakadiMetadata.getEid()) + .setEventType(nakadiMetadata.getEventType()) + .setEventOwner(nakadiMetadata.getEventOwner()) + .setFlowId(nakadiMetadata.getFlowId()) + .setOccurredAt(nakadiMetadata.getOccurredAt()) + .setPartition(nakadiMetadata.getPartition()) + .setParentEids(nakadiMetadata.getParentEids()) + .setPartitionCompactionKey(nakadiMetadata.getPartitionCompactionKey()) + .setPartitionKeys(nakadiMetadata.getPartitionKeys()) + .setPublishedBy(nakadiMetadata.getPublishedBy()) + .setReceivedAt(nakadiMetadata.getReceivedAt()) + .setSpanCtx(nakadiMetadata.getSpanCtx()) + .setVersion(nakadiMetadata.getSchemaVersion()) + .build(); + return Envelope.newBuilder() + .setMetadata(metadata) + .setPayload(ByteBuffer.wrap(nakadiRecord.getPayload())) + .build(); + } + + public NakadiRecord fromAvroGenericRecord(final NakadiMetadata metadata, + final GenericRecord event) throws IOException { + + final var payloadOutputStream = new ByteArrayOutputStream(); + final var eventWriter = new GenericDatumWriter(event.getSchema()); + eventWriter.write(event, EncoderFactory.get() + .directBinaryEncoder(payloadOutputStream, null)); + + return new NakadiRecord() + .setMetadata(metadata) + .setEventKey(null) // fixme remove it once event key implemented + .setPayload(payloadOutputStream.toByteArray()); + } + +} diff --git a/core-common/src/main/java/org/zalando/nakadi/metrics/EventTypeMetrics.java b/core-common/src/main/java/org/zalando/nakadi/metrics/EventTypeMetrics.java index 9031efa4d8..474eaf35ac 100644 --- a/core-common/src/main/java/org/zalando/nakadi/metrics/EventTypeMetrics.java +++ b/core-common/src/main/java/org/zalando/nakadi/metrics/EventTypeMetrics.java @@ -39,7 +39,7 @@ public EventTypeMetrics(final String eventTypeName, final MetricRegistry metricR publishingTimer = metricRegistry.timer(metricNameFor(eventTypeName, "publishing")); } - public void reportSizing(final int eventsPerBatch, final int totalEventSize) { + public void reportSizing(final int eventsPerBatch, final long totalEventSize) { eventsPerBatchHistogram.update(eventsPerBatch); eventCountMeter.mark(eventsPerBatch); trafficInBytesMeter.mark(totalEventSize); diff --git a/core-common/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java b/core-common/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java index 001cd495fa..3c6351697c 100644 --- a/core-common/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java +++ b/core-common/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java @@ -11,6 +11,7 @@ import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; import org.zalando.nakadi.exceptions.runtime.TopicRepositoryException; +import org.zalando.nakadi.mapper.NakadiRecordMapper; import org.zalando.nakadi.repository.kafka.KafkaFactory; import org.zalando.nakadi.repository.kafka.KafkaLocationManager; import org.zalando.nakadi.repository.kafka.KafkaSettings; @@ -33,6 +34,7 @@ public class KafkaRepositoryCreator implements TopicRepositoryCreator { private final KafkaTopicConfigFactory kafkaTopicConfigFactory; private final MetricRegistry metricRegistry; private final ObjectMapper objectMapper; + private final NakadiRecordMapper nakadiRecordMapper; @Autowired public KafkaRepositoryCreator( @@ -41,13 +43,15 @@ public KafkaRepositoryCreator( final ZookeeperSettings zookeeperSettings, final KafkaTopicConfigFactory kafkaTopicConfigFactory, final MetricRegistry metricRegistry, - final ObjectMapper objectMapper) { + final ObjectMapper objectMapper, + final NakadiRecordMapper nakadiRecordMapper) { this.nakadiSettings = nakadiSettings; this.kafkaSettings = kafkaSettings; this.zookeeperSettings = zookeeperSettings; this.kafkaTopicConfigFactory = kafkaTopicConfigFactory; this.metricRegistry = metricRegistry; this.objectMapper = objectMapper; + this.nakadiRecordMapper = nakadiRecordMapper; } @Override @@ -69,10 +73,9 @@ public TopicRepository createTopicRepository(final Storage storage) throws Topic .setKafkaFactory(kafkaFactory) .setNakadiSettings(nakadiSettings) .setKafkaSettings(kafkaSettings) - .setZookeeperSettings(zookeeperSettings) .setKafkaTopicConfigFactory(kafkaTopicConfigFactory) .setKafkaLocationManager(kafkaLocationManager) - .setMetricRegistry(metricRegistry) + .setNakadiRecordMapper(nakadiRecordMapper) .build(); // check that it does work kafkaTopicRepository.listTopics(); diff --git a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java index 70104c7e97..28e89fed9c 100644 --- a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java +++ b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java @@ -1,6 +1,5 @@ package org.zalando.nakadi.repository.kafka; -import com.codahale.metrics.MetricRegistry; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -45,10 +44,10 @@ import org.zalando.nakadi.exceptions.runtime.TopicCreationException; import org.zalando.nakadi.exceptions.runtime.TopicDeletionException; import org.zalando.nakadi.exceptions.runtime.TopicRepositoryException; +import org.zalando.nakadi.mapper.NakadiRecordMapper; import org.zalando.nakadi.repository.EventConsumer; import org.zalando.nakadi.repository.NakadiTopicConfig; import org.zalando.nakadi.repository.TopicRepository; -import org.zalando.nakadi.repository.zookeeper.ZookeeperSettings; import javax.annotation.Nullable; import java.io.IOException; @@ -89,20 +88,18 @@ public class KafkaTopicRepository implements TopicRepository { private final KafkaFactory kafkaFactory; private final NakadiSettings nakadiSettings; private final KafkaSettings kafkaSettings; - private final ZookeeperSettings zookeeperSettings; private final KafkaTopicConfigFactory kafkaTopicConfigFactory; private final KafkaLocationManager kafkaLocationManager; - private final MetricRegistry metricRegistry; + private final NakadiRecordMapper nakadiRecordMapper; public KafkaTopicRepository(final Builder builder) { this.kafkaZookeeper = builder.kafkaZookeeper; this.kafkaFactory = builder.kafkaFactory; this.nakadiSettings = builder.nakadiSettings; this.kafkaSettings = builder.kafkaSettings; - this.zookeeperSettings = builder.zookeeperSettings; this.kafkaLocationManager = builder.kafkaLocationManager; this.kafkaTopicConfigFactory = builder.kafkaTopicConfigFactory; - this.metricRegistry = builder.metricRegistry; + this.nakadiRecordMapper = builder.nakadiRecordMapper; } public static class Builder { @@ -110,10 +107,9 @@ public static class Builder { private KafkaFactory kafkaFactory; private NakadiSettings nakadiSettings; private KafkaSettings kafkaSettings; - private ZookeeperSettings zookeeperSettings; private KafkaTopicConfigFactory kafkaTopicConfigFactory; private KafkaLocationManager kafkaLocationManager; - private MetricRegistry metricRegistry; + private NakadiRecordMapper nakadiRecordMapper; public Builder setKafkaZookeeper(final KafkaZookeeper kafkaZookeeper) { this.kafkaZookeeper = kafkaZookeeper; @@ -135,11 +131,6 @@ public Builder setKafkaSettings(final KafkaSettings kafkaSettings) { return this; } - public Builder setZookeeperSettings(final ZookeeperSettings zookeeperSettings) { - this.zookeeperSettings = zookeeperSettings; - return this; - } - public Builder setKafkaTopicConfigFactory(final KafkaTopicConfigFactory kafkaTopicConfigFactory) { this.kafkaTopicConfigFactory = kafkaTopicConfigFactory; return this; @@ -150,8 +141,8 @@ public Builder setKafkaLocationManager(final KafkaLocationManager kafkaLocationM return this; } - public Builder setMetricRegistry(final MetricRegistry metricRegistry) { - this.metricRegistry = metricRegistry; + public Builder setNakadiRecordMapper(final NakadiRecordMapper nakadiRecordMapper) { + this.nakadiRecordMapper = nakadiRecordMapper; return this; } @@ -204,8 +195,8 @@ private static boolean isExceptionShouldLeadToReset(@Nullable final Exception ex return false; } return Stream.of(NotLeaderForPartitionException.class, UnknownTopicOrPartitionException.class, - org.apache.kafka.common.errors.TimeoutException.class, NetworkException.class, - UnknownServerException.class) + org.apache.kafka.common.errors.TimeoutException.class, NetworkException.class, + UnknownServerException.class) .anyMatch(clazz -> clazz.isAssignableFrom(exception.getClass())); } @@ -438,15 +429,13 @@ public List sendEvents(final String topic, final Map responses = new ConcurrentHashMap<>(); try { for (final NakadiRecord nakadiRecord : nakadiRecords) { - final ProducerRecord producerRecord = nakadiRecord.toProducerRecord(topic); + final ProducerRecord producerRecord = + nakadiRecordMapper.mapToProducerRecord(nakadiRecord, topic); if (null != nakadiRecord.getOwner()) { nakadiRecord.getOwner().serialize(producerRecord); } - producerRecord.headers().add( - NakadiRecord.HEADER_FORMAT, - nakadiRecord.getFormat()); producer.send(producerRecord, ((metadata, exception) -> { try { final NakadiRecordResult result; diff --git a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java index ee69987a06..b0705de2b0 100644 --- a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java +++ b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/NakadiKafkaConsumer.java @@ -8,8 +8,8 @@ import org.apache.kafka.common.header.Headers; import org.zalando.nakadi.domain.ConsumedEvent; import org.zalando.nakadi.domain.EventOwnerHeader; -import org.zalando.nakadi.domain.NakadiRecord; import org.zalando.nakadi.domain.Timeline; +import org.zalando.nakadi.mapper.NakadiRecordMapper; import org.zalando.nakadi.repository.EventConsumer; import java.util.ArrayList; @@ -68,8 +68,8 @@ public List readEvents() { final Timeline timeline = timelineMap.get(new TopicPartition(record.topic(), record.partition())); result.add(new ConsumedEvent( - recordDeserializer.deserialize( - getHeaderValue(record.headers(), NakadiRecord.HEADER_FORMAT), + recordDeserializer.deserializeToJsonBytes( + getHeaderValue(record.headers(), NakadiRecordMapper.HEADER_FORMAT), record.value()), cursor.toNakadiCursor(timeline), record.timestamp(), diff --git a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/RecordDeserializer.java b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/RecordDeserializer.java index ad2bf23516..780ec15b7b 100644 --- a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/RecordDeserializer.java +++ b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/RecordDeserializer.java @@ -1,5 +1,5 @@ package org.zalando.nakadi.repository.kafka; public interface RecordDeserializer { - byte[] deserialize(byte[] eventFormat, byte[] data); + byte[] deserializeToJsonBytes(byte[] eventFormat, byte[] data); } diff --git a/core-common/src/main/java/org/zalando/nakadi/service/LocalSchemaRegistry.java b/core-common/src/main/java/org/zalando/nakadi/service/LocalSchemaRegistry.java index 3c9040c627..cf2fbdeab5 100644 --- a/core-common/src/main/java/org/zalando/nakadi/service/LocalSchemaRegistry.java +++ b/core-common/src/main/java/org/zalando/nakadi/service/LocalSchemaRegistry.java @@ -1,7 +1,5 @@ package org.zalando.nakadi.service; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.avro.AvroMapper; import org.apache.avro.Schema; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -27,10 +25,12 @@ public class LocalSchemaRegistry { public static final String METADATA_KEY = "metadata"; + public static final String BATCH_PUBLISHING_KEY = "batch.publishing"; private static final Comparator SCHEMA_VERSION_COMPARATOR = Comparator.comparingInt(Integer::parseInt); private static final Collection INTERNAL_EVENT_TYPE_NAMES = Set.of( METADATA_KEY, + BATCH_PUBLISHING_KEY, KPIEventTypes.ACCESS_LOG, KPIEventTypes.BATCH_PUBLISHED, KPIEventTypes.DATA_STREAMED, @@ -38,19 +38,11 @@ public class LocalSchemaRegistry { KPIEventTypes.SUBSCRIPTION_LOG); private final Map> eventTypeSchema; - private final AvroMapper avroMapper; - private final ObjectMapper objectMapper; @Autowired public LocalSchemaRegistry( - final AvroMapper avroMapper, - final ObjectMapper objectMapper, - @Value("${nakadi.avro.schema.root:classpath:event-type-schema/}") final Resource eventTypeSchemaRes) + @Value("${nakadi.avro.schema.root:classpath:avro-schema/}") final Resource eventTypeSchemaRes) throws IOException { - - this.avroMapper = avroMapper; - this.objectMapper = objectMapper; - this.eventTypeSchema = new HashMap<>(); for (final String eventTypeName : INTERNAL_EVENT_TYPE_NAMES) { @@ -80,14 +72,6 @@ private TreeMap loadEventTypeSchemaVersionsFromResource( return versionToSchema; } - public AvroMapper getAvroMapper() { - return avroMapper; - } - - public ObjectMapper getObjectMapper() { - return objectMapper; - } - public VersionedAvroSchema getLatestEventTypeSchemaVersion(final String eventTypeName) { final var entry = getEventTypeSchemaVersions(eventTypeName).lastEntry(); return new VersionedAvroSchema(entry.getValue(), entry.getKey()); @@ -102,7 +86,7 @@ public Schema getEventTypeSchema(final String eventTypeName, final String schema return schema; } - private TreeMap getEventTypeSchemaVersions(final String eventTypeName) { + public TreeMap getEventTypeSchemaVersions(final String eventTypeName) { final TreeMap versionToSchema = eventTypeSchema.get(eventTypeName); if (versionToSchema == null) { throw new NoSuchEventTypeException("Avro event type not found: " + eventTypeName); diff --git a/core-common/src/main/resources/avro-schema/batch.publishing/batch.publishing.0.avsc b/core-common/src/main/resources/avro-schema/batch.publishing/batch.publishing.0.avsc new file mode 100644 index 0000000000..7f5925ae5d --- /dev/null +++ b/core-common/src/main/resources/avro-schema/batch.publishing/batch.publishing.0.avsc @@ -0,0 +1,136 @@ +{ + "name": "PublishingBatchV0", + "namespace": "org.zalando.nakadi.generated.avro", + "type": "record", + "fields": [ + { + "name": "events", + "type": { + "type": "array", + "items": { + "name": "EnvelopeV0", + "namespace": "org.zalando.nakadi.generated.avro", + "type": "record", + "fields": [ + { + "name": "metadata", + "type": { + "name": "MetadataV0", + "type": "record", + "namespace": "org.zalando.nakadi.generated.avro", + "doc": "Event metadata defines data about the payload and additional information for Nakadi operations", + "fields": [ + { + "name": "occurred_at", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + }, + { + "name": "eid", + "type": { + "type": "string", + "logicalType": "uuid" + } + }, + { + "name": "flow_id", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "received_at", + "type": [ + "null", + { + "type": "long", + "logicalType": "timestamp-millis" + } + ], + "default": null + }, + { + "name": "version", + "type": "string" + }, + { + "name": "published_by", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "event_type", + "type": "string" + }, + { + "name": "partition", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "partition_keys", + "type": [ + "null", + { + "type": "array", + "items": "string" + } + ], + "default": null + }, + { + "name": "partition_compaction_key", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "parent_eids", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "string", + "logicalType": "uuid" + } + } + ], + "default": null + }, + { + "name": "span_ctx", + "type": [ + "null", + "string" + ], + "default": null + } + ] + } + }, + { + "name": "payload", + "type": { + "type": "bytes" + } + } + ] + } + } + } + ] +} + diff --git a/core-common/src/main/resources/avro-schema/batch.publishing/batch.publishing.1.avsc b/core-common/src/main/resources/avro-schema/batch.publishing/batch.publishing.1.avsc new file mode 100644 index 0000000000..3be2dedf6e --- /dev/null +++ b/core-common/src/main/resources/avro-schema/batch.publishing/batch.publishing.1.avsc @@ -0,0 +1,143 @@ +{ + "name": "PublishingBatch", + "namespace": "org.zalando.nakadi.generated.avro", + "type": "record", + "fields": [ + { + "name": "events", + "type": { + "type": "array", + "items": { + "name": "Envelope", + "namespace": "org.zalando.nakadi.generated.avro", + "type": "record", + "fields": [ + { + "name": "metadata", + "type": { + "name": "Metadata", + "type": "record", + "namespace": "org.zalando.nakadi.generated.avro", + "doc": "Event metadata defines data about the payload and additional information for Nakadi operations", + "fields": [ + { + "name": "occurred_at", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + }, + { + "name": "eid", + "type": { + "type": "string", + "logicalType": "uuid" + } + }, + { + "name": "flow_id", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "received_at", + "type": [ + "null", + { + "type": "long", + "logicalType": "timestamp-millis" + } + ], + "default": null + }, + { + "name": "version", + "type": "string" + }, + { + "name": "published_by", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "event_type", + "type": "string" + }, + { + "name": "partition", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "partition_keys", + "type": [ + "null", + { + "type": "array", + "items": "string" + } + ], + "default": null + }, + { + "name": "partition_compaction_key", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "parent_eids", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "string", + "logicalType": "uuid" + } + } + ], + "default": null + }, + { + "name": "span_ctx", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "event_owner", + "type": [ + "null", + "string" + ], + "default": null + } + ] + } + }, + { + "name": "payload", + "type": { + "type": "bytes" + } + } + ] + } + } + } + ] +} \ No newline at end of file diff --git a/core-common/src/main/resources/event-type-schema/metadata/metadata.0.avsc b/core-common/src/main/resources/avro-schema/metadata/metadata.0.avsc similarity index 100% rename from core-common/src/main/resources/event-type-schema/metadata/metadata.0.avsc rename to core-common/src/main/resources/avro-schema/metadata/metadata.0.avsc diff --git a/core-common/src/main/resources/event-type-schema/metadata/metadata.1.avsc b/core-common/src/main/resources/avro-schema/metadata/metadata.1.avsc similarity index 100% rename from core-common/src/main/resources/event-type-schema/metadata/metadata.1.avsc rename to core-common/src/main/resources/avro-schema/metadata/metadata.1.avsc diff --git a/core-common/src/main/resources/event-type-schema/metadata/metadata.2.avsc b/core-common/src/main/resources/avro-schema/metadata/metadata.2.avsc similarity index 100% rename from core-common/src/main/resources/event-type-schema/metadata/metadata.2.avsc rename to core-common/src/main/resources/avro-schema/metadata/metadata.2.avsc diff --git a/core-common/src/main/resources/event-type-schema/metadata/metadata.3.avsc b/core-common/src/main/resources/avro-schema/metadata/metadata.3.avsc similarity index 100% rename from core-common/src/main/resources/event-type-schema/metadata/metadata.3.avsc rename to core-common/src/main/resources/avro-schema/metadata/metadata.3.avsc diff --git a/core-common/src/main/resources/event-type-schema/metadata/metadata.4.avsc b/core-common/src/main/resources/avro-schema/metadata/metadata.4.avsc similarity index 100% rename from core-common/src/main/resources/event-type-schema/metadata/metadata.4.avsc rename to core-common/src/main/resources/avro-schema/metadata/metadata.4.avsc diff --git a/core-common/src/main/resources/event-type-schema/metadata/metadata.5.avsc b/core-common/src/main/resources/avro-schema/metadata/metadata.5.avsc similarity index 100% rename from core-common/src/main/resources/event-type-schema/metadata/metadata.5.avsc rename to core-common/src/main/resources/avro-schema/metadata/metadata.5.avsc diff --git a/core-common/src/main/resources/event-type-schema/nakadi.access.log/nakadi.access.log.0.avsc b/core-common/src/main/resources/avro-schema/nakadi.access.log/nakadi.access.log.0.avsc similarity index 100% rename from core-common/src/main/resources/event-type-schema/nakadi.access.log/nakadi.access.log.0.avsc rename to core-common/src/main/resources/avro-schema/nakadi.access.log/nakadi.access.log.0.avsc diff --git a/core-common/src/main/resources/event-type-schema/nakadi.access.log/nakadi.access.log.1.avsc b/core-common/src/main/resources/avro-schema/nakadi.access.log/nakadi.access.log.1.avsc similarity index 100% rename from core-common/src/main/resources/event-type-schema/nakadi.access.log/nakadi.access.log.1.avsc rename to core-common/src/main/resources/avro-schema/nakadi.access.log/nakadi.access.log.1.avsc diff --git a/core-common/src/main/resources/event-type-schema/nakadi.batch.published/nakadi.batch.published.0.avsc b/core-common/src/main/resources/avro-schema/nakadi.batch.published/nakadi.batch.published.0.avsc similarity index 100% rename from core-common/src/main/resources/event-type-schema/nakadi.batch.published/nakadi.batch.published.0.avsc rename to core-common/src/main/resources/avro-schema/nakadi.batch.published/nakadi.batch.published.0.avsc diff --git a/core-common/src/main/resources/avro-schema/nakadi.batch.published/nakadi.batch.published.1.avsc b/core-common/src/main/resources/avro-schema/nakadi.batch.published/nakadi.batch.published.1.avsc new file mode 100644 index 0000000000..ec8ca36416 --- /dev/null +++ b/core-common/src/main/resources/avro-schema/nakadi.batch.published/nakadi.batch.published.1.avsc @@ -0,0 +1,35 @@ +{ + "name": "nakadi.batch.published", + "type": "record", + "doc": "Stores KPI event of type nakadi.batch.published", + "fields": [ + { + "name": "event_type", + "type": "string" + }, + { + "name": "app", + "type": "string" + }, + { + "name": "app_hashed", + "type": "string" + }, + { + "name": "token_realm", + "type": "string" + }, + { + "name": "number_of_events", + "type": "int" + }, + { + "name": "ms_spent", + "type": "long" + }, + { + "name": "batch_size", + "type": "long" + } + ] +} diff --git a/core-common/src/main/resources/event-type-schema/nakadi.data.streamed/nakadi.data.streamed.0.avsc b/core-common/src/main/resources/avro-schema/nakadi.data.streamed/nakadi.data.streamed.0.avsc similarity index 100% rename from core-common/src/main/resources/event-type-schema/nakadi.data.streamed/nakadi.data.streamed.0.avsc rename to core-common/src/main/resources/avro-schema/nakadi.data.streamed/nakadi.data.streamed.0.avsc diff --git a/core-common/src/main/resources/event-type-schema/nakadi.event.type.log/nakadi.event.type.log.0.avsc b/core-common/src/main/resources/avro-schema/nakadi.event.type.log/nakadi.event.type.log.0.avsc similarity index 100% rename from core-common/src/main/resources/event-type-schema/nakadi.event.type.log/nakadi.event.type.log.0.avsc rename to core-common/src/main/resources/avro-schema/nakadi.event.type.log/nakadi.event.type.log.0.avsc diff --git a/core-common/src/main/resources/event-type-schema/nakadi.subscription.log/nakadi.subscription.log.0.avsc b/core-common/src/main/resources/avro-schema/nakadi.subscription.log/nakadi.subscription.log.0.avsc similarity index 100% rename from core-common/src/main/resources/event-type-schema/nakadi.subscription.log/nakadi.subscription.log.0.avsc rename to core-common/src/main/resources/avro-schema/nakadi.subscription.log/nakadi.subscription.log.0.avsc diff --git a/core-common/src/test/java/org/zalando/nakadi/domain/kpi/AccessLogEventTest.java b/core-common/src/test/java/org/zalando/nakadi/domain/kpi/AccessLogEventTest.java index 6e11cccaa9..3a743fe71a 100644 --- a/core-common/src/test/java/org/zalando/nakadi/domain/kpi/AccessLogEventTest.java +++ b/core-common/src/test/java/org/zalando/nakadi/domain/kpi/AccessLogEventTest.java @@ -1,24 +1,16 @@ package org.zalando.nakadi.domain.kpi; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.avro.AvroMapper; import org.junit.Test; -import org.springframework.core.io.DefaultResourceLoader; -import org.zalando.nakadi.service.LocalSchemaRegistry; import org.zalando.nakadi.service.KPIEventMapper; -import java.io.IOException; import java.util.Set; import static org.junit.Assert.assertEquals; public class AccessLogEventTest { - private final LocalSchemaRegistry localSchemaRegistry; private final KPIEventMapper eventMapper; - public AccessLogEventTest() throws IOException { - final var eventTypeRes = new DefaultResourceLoader().getResource("event-type-schema/"); - this.localSchemaRegistry = new LocalSchemaRegistry(new AvroMapper(), new ObjectMapper(), eventTypeRes); + public AccessLogEventTest() { this.eventMapper = new KPIEventMapper(Set.of(AccessLogEvent.class)); } diff --git a/core-common/src/test/java/org/zalando/nakadi/domain/kpi/BatchPublishedEventTest.java b/core-common/src/test/java/org/zalando/nakadi/domain/kpi/BatchPublishedEventTest.java index f183a62c56..7b75b7a296 100644 --- a/core-common/src/test/java/org/zalando/nakadi/domain/kpi/BatchPublishedEventTest.java +++ b/core-common/src/test/java/org/zalando/nakadi/domain/kpi/BatchPublishedEventTest.java @@ -1,10 +1,8 @@ package org.zalando.nakadi.domain.kpi; import org.junit.Test; -import org.springframework.core.io.DefaultResourceLoader; import org.zalando.nakadi.service.KPIEventMapper; -import java.io.IOException; import java.util.Set; import java.util.UUID; @@ -13,8 +11,7 @@ public class BatchPublishedEventTest { private final KPIEventMapper eventMapper; - public BatchPublishedEventTest() throws IOException { - final var eventTypeRes = new DefaultResourceLoader().getResource("event-type-schema/"); + public BatchPublishedEventTest() { this.eventMapper = new KPIEventMapper(Set.of(BatchPublishedEvent.class)); } diff --git a/core-common/src/test/java/org/zalando/nakadi/domain/kpi/DataStreamedEventTest.java b/core-common/src/test/java/org/zalando/nakadi/domain/kpi/DataStreamedEventTest.java index ccb006e1a0..8b40fa7895 100644 --- a/core-common/src/test/java/org/zalando/nakadi/domain/kpi/DataStreamedEventTest.java +++ b/core-common/src/test/java/org/zalando/nakadi/domain/kpi/DataStreamedEventTest.java @@ -1,13 +1,8 @@ package org.zalando.nakadi.domain.kpi; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.avro.AvroMapper; import org.junit.Test; -import org.springframework.core.io.DefaultResourceLoader; -import org.zalando.nakadi.service.LocalSchemaRegistry; import org.zalando.nakadi.service.KPIEventMapper; -import java.io.IOException; import java.util.Set; import java.util.UUID; @@ -15,12 +10,9 @@ import static org.junit.Assert.assertNull; public class DataStreamedEventTest { - private final LocalSchemaRegistry localSchemaRegistry; private final KPIEventMapper eventMapper; - public DataStreamedEventTest() throws IOException { - final var eventTypeRes = new DefaultResourceLoader().getResource("event-type-schema/"); - this.localSchemaRegistry = new LocalSchemaRegistry(new AvroMapper(), new ObjectMapper(), eventTypeRes); + public DataStreamedEventTest() { this.eventMapper = new KPIEventMapper(Set.of(DataStreamedEvent.class)); } diff --git a/core-common/src/test/java/org/zalando/nakadi/domain/kpi/EventTypeLogEventTest.java b/core-common/src/test/java/org/zalando/nakadi/domain/kpi/EventTypeLogEventTest.java index 076fb68fc8..08c44f485d 100644 --- a/core-common/src/test/java/org/zalando/nakadi/domain/kpi/EventTypeLogEventTest.java +++ b/core-common/src/test/java/org/zalando/nakadi/domain/kpi/EventTypeLogEventTest.java @@ -1,25 +1,17 @@ package org.zalando.nakadi.domain.kpi; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.avro.AvroMapper; import org.junit.Test; -import org.springframework.core.io.DefaultResourceLoader; -import org.zalando.nakadi.service.LocalSchemaRegistry; import org.zalando.nakadi.service.KPIEventMapper; -import java.io.IOException; import java.util.Set; import java.util.UUID; import static org.junit.jupiter.api.Assertions.assertEquals; public class EventTypeLogEventTest { - private final LocalSchemaRegistry localSchemaRegistry; private final KPIEventMapper eventMapper; - public EventTypeLogEventTest() throws IOException { - final var eventTypeRes = new DefaultResourceLoader().getResource("event-type-schema/"); - this.localSchemaRegistry = new LocalSchemaRegistry(new AvroMapper(), new ObjectMapper(), eventTypeRes); + public EventTypeLogEventTest() { this.eventMapper = new KPIEventMapper(Set.of(EventTypeLogEvent.class)); } diff --git a/core-common/src/test/java/org/zalando/nakadi/domain/kpi/SubscriptionLogEventTest.java b/core-common/src/test/java/org/zalando/nakadi/domain/kpi/SubscriptionLogEventTest.java index 8bcae542a6..fa15d980d5 100644 --- a/core-common/src/test/java/org/zalando/nakadi/domain/kpi/SubscriptionLogEventTest.java +++ b/core-common/src/test/java/org/zalando/nakadi/domain/kpi/SubscriptionLogEventTest.java @@ -1,26 +1,18 @@ package org.zalando.nakadi.domain.kpi; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.avro.AvroMapper; import org.junit.Test; -import org.springframework.core.io.DefaultResourceLoader; -import org.zalando.nakadi.service.LocalSchemaRegistry; import org.zalando.nakadi.service.KPIEventMapper; -import java.io.IOException; import java.util.Set; import java.util.UUID; import static org.junit.jupiter.api.Assertions.assertEquals; public class SubscriptionLogEventTest { - private final LocalSchemaRegistry localSchemaRegistry; private final KPIEventMapper eventMapper; - public SubscriptionLogEventTest() throws IOException { - final var eventTypeRes = new DefaultResourceLoader().getResource("event-type-schema/"); - this.localSchemaRegistry = new LocalSchemaRegistry(new AvroMapper(), new ObjectMapper(), eventTypeRes); + public SubscriptionLogEventTest() { this.eventMapper = new KPIEventMapper(Set.of(SubscriptionLogEvent.class)); } diff --git a/core-common/src/test/java/org/zalando/nakadi/mapper/NakadiRecordMapperTest.java b/core-common/src/test/java/org/zalando/nakadi/mapper/NakadiRecordMapperTest.java new file mode 100644 index 0000000000..1be0af02e9 --- /dev/null +++ b/core-common/src/test/java/org/zalando/nakadi/mapper/NakadiRecordMapperTest.java @@ -0,0 +1,83 @@ +package org.zalando.nakadi.mapper; + +import org.junit.Assert; +import org.junit.Test; +import org.springframework.core.io.DefaultResourceLoader; +import org.springframework.core.io.Resource; +import org.zalando.nakadi.domain.NakadiMetadata; +import org.zalando.nakadi.domain.NakadiRecord; +import org.zalando.nakadi.generated.avro.Envelope; +import org.zalando.nakadi.generated.avro.EnvelopeV0; +import org.zalando.nakadi.generated.avro.Metadata; +import org.zalando.nakadi.generated.avro.MetadataV0; +import org.zalando.nakadi.generated.avro.PublishingBatch; +import org.zalando.nakadi.generated.avro.PublishingBatchV0; +import org.zalando.nakadi.service.LocalSchemaRegistry; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.List; +import java.util.UUID; + +public class NakadiRecordMapperTest { + + @Test + public void testFromBytesBatch() throws IOException { + final Resource eventTypeRes = new DefaultResourceLoader().getResource("avro-schema/"); + final LocalSchemaRegistry localSchemaRegistry = new LocalSchemaRegistry(eventTypeRes); + + final PublishingBatch batch = PublishingBatch.newBuilder() + .setEvents(List.of(Envelope.newBuilder() + .setMetadata(Metadata.newBuilder() + .setEid(UUID.randomUUID().toString()) + .setOccurredAt(Instant.now()) + .setVersion("1.0.0") + .setEventType("some.event.type") + .build()) + .setPayload(ByteBuffer.wrap("First record for testing !!!" + .getBytes(StandardCharsets.UTF_8))).build())) + .build(); + + final ByteBuffer byteBuffer = PublishingBatch.getEncoder().encode(batch); + final NakadiRecordMapper mapper = new NakadiRecordMapper(localSchemaRegistry); + final List nakadiRecords = + mapper.fromBytesBatch(new ByteArrayInputStream(byteBuffer.array())); + + Assert.assertEquals( + batch.getEvents().get(0).getMetadata().getEid(), + nakadiRecords.get(0).getMetadata().getEid() + ); + } + + @Test + public void testFromBytesBatchDifferentVersions() throws IOException { + final Resource eventTypeRes = new DefaultResourceLoader().getResource("avro-schema/"); + final LocalSchemaRegistry localSchemaRegistry = new LocalSchemaRegistry(eventTypeRes); + + final String eventEid = "AB5D12E9-8376-4584-802C-3AFA1CA1D97C"; + final PublishingBatchV0 batch = PublishingBatchV0.newBuilder() + .setEvents(List.of(EnvelopeV0.newBuilder() + .setMetadata(MetadataV0.newBuilder() + .setEid(eventEid) + .setOccurredAt(Instant.now()) + .setVersion("1.0.0") + .setEventType("some.event.type") + .build()) + .setPayload(ByteBuffer.wrap("First record for testing !!!" + .getBytes(StandardCharsets.UTF_8))).build())) + .build(); + + final ByteBuffer byteBuffer = PublishingBatchV0.getEncoder().encode(batch); + final NakadiRecordMapper mapper = new NakadiRecordMapper(localSchemaRegistry); + final List nakadiRecords = + mapper.fromBytesBatch(new ByteArrayInputStream(byteBuffer.array())); + + final NakadiMetadata metadata = nakadiRecords.get(0).getMetadata(); + Assert.assertNull(metadata.getEventOwner()); + Assert.assertEquals(eventEid, metadata.getEid()); + } + +} diff --git a/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java b/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java index 20a609dac3..f9dd7b294e 100644 --- a/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java +++ b/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java @@ -1,8 +1,5 @@ package org.zalando.nakadi.repository.kafka; -import com.codahale.metrics.MetricRegistry; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.avro.AvroMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.kafka.clients.consumer.Consumer; @@ -20,13 +17,11 @@ import org.mockito.Captor; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; -import org.springframework.core.io.DefaultResourceLoader; import org.zalando.nakadi.config.NakadiSettings; import org.zalando.nakadi.domain.BatchItem; import org.zalando.nakadi.domain.CursorError; import org.zalando.nakadi.domain.EventOwnerHeader; import org.zalando.nakadi.domain.EventPublishingStatus; -import org.zalando.nakadi.domain.NakadiAvroMetadata; import org.zalando.nakadi.domain.NakadiCursor; import org.zalando.nakadi.domain.NakadiMetadata; import org.zalando.nakadi.domain.NakadiRecord; @@ -37,8 +32,8 @@ import org.zalando.nakadi.domain.TopicPartition; import org.zalando.nakadi.exceptions.runtime.EventPublishingException; import org.zalando.nakadi.exceptions.runtime.InvalidCursorException; -import org.zalando.nakadi.repository.zookeeper.ZookeeperSettings; -import org.zalando.nakadi.service.LocalSchemaRegistry; +import org.zalando.nakadi.mapper.NakadiRecordMapper; +import org.zalando.nakadi.utils.TestUtils; import org.zalando.nakadi.view.Cursor; import java.io.IOException; @@ -82,12 +77,11 @@ public class KafkaTopicRepositoryTest { private static final Node NODE = new Node(1, "host", 9091); private final NakadiSettings nakadiSettings = mock(NakadiSettings.class); private final KafkaSettings kafkaSettings = mock(KafkaSettings.class); - private final ZookeeperSettings zookeeperSettings = mock(ZookeeperSettings.class); private final KafkaTopicConfigFactory kafkaTopicConfigFactory = mock(KafkaTopicConfigFactory.class); private final KafkaLocationManager kafkaLocationManager = mock(KafkaLocationManager.class); + private NakadiRecordMapper nakadiRecordMapper; private static final String KAFKA_CLIENT_ID = "application_name-topic_name"; private final RecordDeserializer recordDeserializer = (f, e) -> e; - private final LocalSchemaRegistry localSchemaRegistry; @Captor private ArgumentCaptor> producerRecordArgumentCaptor; @@ -134,14 +128,12 @@ public KafkaTopicRepositoryTest() throws IOException { when(kafkaProducer.partitionsFor(anyString())).then( invocation -> partitionsOfTopic((String) invocation.getArguments()[0]) ); + nakadiRecordMapper = TestUtils.getNakadiRecordMapper(); kafkaFactory = createKafkaFactory(); - kafkaTopicRepository = createKafkaRepository(kafkaFactory, new MetricRegistry()); + kafkaTopicRepository = createKafkaRepository(kafkaFactory); MockitoAnnotations.initMocks(this); - final var eventTypeRes = new DefaultResourceLoader().getResource("event-type-schema/"); - this.localSchemaRegistry = new LocalSchemaRegistry(new AvroMapper(), new ObjectMapper(), eventTypeRes); } - @Test public void canListAllTopics() { final List allTopics = allTopics().stream().collect(toList()); @@ -549,18 +541,16 @@ private static Cursor cursor(final String partition, final String offset) { return new Cursor(partition, offset); } - private KafkaTopicRepository createKafkaRepository(final KafkaFactory kafkaFactory, - final MetricRegistry metricRegistry) { + private KafkaTopicRepository createKafkaRepository(final KafkaFactory kafkaFactory) { try { return new KafkaTopicRepository.Builder() .setKafkaZookeeper(createKafkaZookeeper()) .setKafkaFactory(kafkaFactory) .setNakadiSettings(nakadiSettings) .setKafkaSettings(kafkaSettings) - .setZookeeperSettings(zookeeperSettings) .setKafkaTopicConfigFactory(kafkaTopicConfigFactory) .setKafkaLocationManager(kafkaLocationManager) - .setMetricRegistry(metricRegistry) + .setNakadiRecordMapper(nakadiRecordMapper) .build(); } catch (final Exception e) { throw new RuntimeException(e); @@ -637,19 +627,16 @@ private ProducerRecord captureProducerRecordSent() { } private NakadiRecord getTestNakadiRecord(final String partition) { - final NakadiMetadata metadata = new NakadiAvroMetadata((byte) 1, - localSchemaRegistry.getLatestEventTypeSchemaVersion( - LocalSchemaRegistry.METADATA_KEY).getSchema()); + final NakadiMetadata metadata = new NakadiMetadata(); metadata.setEid(UUID.randomUUID().toString()); - metadata.setOccurredAt(Instant.now().toEpochMilli()); + metadata.setOccurredAt(Instant.now()); metadata.setSchemaVersion("0"); metadata.setPartition(partition); metadata.setEventType("test-event"); return new NakadiRecord() .setMetadata(metadata) - .setPayload(new byte[0]) - .setFormat(NakadiRecord.Format.AVRO.getFormat()); + .setPayload(new byte[0]); } } diff --git a/core-common/src/test/java/org/zalando/nakadi/utils/TestUtils.java b/core-common/src/test/java/org/zalando/nakadi/utils/TestUtils.java index 5b52c28b5e..1729f3d567 100644 --- a/core-common/src/test/java/org/zalando/nakadi/utils/TestUtils.java +++ b/core-common/src/test/java/org/zalando/nakadi/utils/TestUtils.java @@ -9,6 +9,7 @@ import org.joda.time.DateTimeZone; import org.json.JSONArray; import org.json.JSONObject; +import org.springframework.core.io.DefaultResourceLoader; import org.springframework.http.converter.StringHttpMessageConverter; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; import org.springframework.test.web.servlet.MockMvc; @@ -23,10 +24,12 @@ import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.domain.storage.Storage; import org.zalando.nakadi.exceptions.runtime.AccessDeniedException; +import org.zalando.nakadi.mapper.NakadiRecordMapper; import org.zalando.nakadi.plugin.api.authz.AuthorizationAttribute; import org.zalando.nakadi.plugin.api.authz.AuthorizationService; import org.zalando.nakadi.plugin.api.authz.Resource; import org.zalando.nakadi.problem.ValidationProblem; +import org.zalando.nakadi.service.LocalSchemaRegistry; import org.zalando.problem.Problem; import java.io.IOException; @@ -248,4 +251,13 @@ public static String toTimelineOffset(final long offset) { return String.format("001-0001-%018d", offset); } + public static NakadiRecordMapper getNakadiRecordMapper() throws IOException { + final var eventTypeRes = new DefaultResourceLoader().getResource("avro-schema/"); + return new NakadiRecordMapper(new LocalSchemaRegistry(eventTypeRes)); + } + + public static LocalSchemaRegistry getLocalSchemaRegistry() throws IOException { + return new LocalSchemaRegistry(new DefaultResourceLoader().getResource("avro-schema/")); + } + } diff --git a/core-services/src/main/java/org/zalando/nakadi/repository/kafka/AvroDeserializerWithSequenceDecoder.java b/core-services/src/main/java/org/zalando/nakadi/repository/kafka/AvroDeserializerWithSequenceDecoder.java index 84ea2ad6b8..cef65fd7ed 100644 --- a/core-services/src/main/java/org/zalando/nakadi/repository/kafka/AvroDeserializerWithSequenceDecoder.java +++ b/core-services/src/main/java/org/zalando/nakadi/repository/kafka/AvroDeserializerWithSequenceDecoder.java @@ -5,7 +5,6 @@ import org.joda.time.DateTimeZone; import org.json.JSONObject; import org.zalando.nakadi.domain.EnvelopeHolder; -import org.zalando.nakadi.domain.NakadiAvroMetadata; import org.zalando.nakadi.service.LocalSchemaRegistry; import org.zalando.nakadi.service.SchemaProviderService; @@ -26,12 +25,11 @@ public AvroDeserializerWithSequenceDecoder( final LocalSchemaRegistry localSchemaRegistry) { this.schemaService = schemaService; this.localSchemaRegistry = localSchemaRegistry; - this.metadataSequenceDecoders = new HashMap<>(); this.eventSequenceDecoders = new HashMap<>(); } - public byte[] deserializeAvro(final EnvelopeHolder envelope) throws RuntimeException { + public byte[] deserializeAvroToJsonBytes(final EnvelopeHolder envelope) throws RuntimeException { try { final byte metadataVersion = envelope.getMetadataVersion(); @@ -44,20 +42,19 @@ public byte[] deserializeAvro(final EnvelopeHolder envelope) throws RuntimeExcep final GenericRecord metadata = metadataDecoder.read(envelope.getMetadata()); - metadata.put(NakadiAvroMetadata.OCCURRED_AT, new DateTime( - (long) metadata.get(NakadiAvroMetadata.OCCURRED_AT), DateTimeZone.UTC).toString()); + metadata.put("occurred_at", new DateTime( + (long) metadata.get("occurred_at"), DateTimeZone.UTC).toString()); - final var receivedAt = metadata.get(NakadiAvroMetadata.RECEIVED_AT); + final var receivedAt = metadata.get("received_at"); if (receivedAt != null) { - metadata.put(NakadiAvroMetadata.RECEIVED_AT, new DateTime( + metadata.put("received_at", new DateTime( (long) receivedAt, DateTimeZone.UTC).toString()); } - final String eventType = metadata.get(NakadiAvroMetadata.EVENT_TYPE).toString(); - + final String eventType = metadata.get("event_type").toString(); final SequenceDecoder eventDecoder = eventSequenceDecoders.computeIfAbsent( - metadata.get(NakadiAvroMetadata.SCHEMA_VERSION).toString(), + metadata.get("version").toString(), (v) -> new SequenceDecoder( schemaService.getAvroSchema(eventType, v)) ); @@ -74,6 +71,7 @@ public byte[] deserializeAvro(final EnvelopeHolder envelope) throws RuntimeExcep } catch (final IOException io) { throw new RuntimeException("failed to deserialize avro event", io); } + } private static JSONObject getJsonWithNonNullValues(final String json) { diff --git a/core-services/src/main/java/org/zalando/nakadi/repository/kafka/KafkaRecordDeserializer.java b/core-services/src/main/java/org/zalando/nakadi/repository/kafka/KafkaRecordDeserializer.java index 3777510a24..70f8780c74 100644 --- a/core-services/src/main/java/org/zalando/nakadi/repository/kafka/KafkaRecordDeserializer.java +++ b/core-services/src/main/java/org/zalando/nakadi/repository/kafka/KafkaRecordDeserializer.java @@ -1,31 +1,65 @@ package org.zalando.nakadi.repository.kafka; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.RawMessageDecoder; +import org.apache.avro.specific.SpecificData; +import org.json.JSONObject; import org.zalando.nakadi.domain.EnvelopeHolder; -import org.zalando.nakadi.domain.NakadiRecord; +import org.zalando.nakadi.generated.avro.Envelope; +import org.zalando.nakadi.generated.avro.Metadata; +import org.zalando.nakadi.mapper.NakadiRecordMapper; import org.zalando.nakadi.service.LocalSchemaRegistry; import org.zalando.nakadi.service.SchemaProviderService; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class KafkaRecordDeserializer implements RecordDeserializer { + // https://avro.apache.org/docs/current/spec.html#single_object_encoding_spec + private static final byte[] AVRO_V1_HEADER = new byte[]{(byte) 0xC3, (byte) 0x01}; + + private static final Map> RAW_DECODERS = new ConcurrentHashMap<>(); + private static final Map> BINARY_DECODERS = new ConcurrentHashMap<>(); + private final AvroDeserializerWithSequenceDecoder decoder; + private final SchemaProviderService schemaService; + private final NakadiRecordMapper nakadiRecordMapper; - public KafkaRecordDeserializer(final SchemaProviderService schemaService, + public KafkaRecordDeserializer(final NakadiRecordMapper nakadiRecordMapper, + final SchemaProviderService schemaService, final LocalSchemaRegistry localSchemaRegistry) { + this.nakadiRecordMapper = nakadiRecordMapper; + this.schemaService = schemaService; this.decoder = new AvroDeserializerWithSequenceDecoder(schemaService, localSchemaRegistry); } - public byte[] deserialize(final byte[] eventFormat, final byte[] data) { + public byte[] deserializeToJsonBytes(final byte[] eventFormat, final byte[] data) { + if (data == null) { + return null; + } + if (eventFormat == null) { - // JSON + if (data[0] == AVRO_V1_HEADER[0] && data[1] == AVRO_V1_HEADER[1]) { + final Envelope envelope = nakadiRecordMapper.fromBytesEnvelope(new ByteArrayInputStream(data)); + return deserializeToJsonBytes(envelope); + } + + // then it should be JSON return data; } - if (Arrays.equals(eventFormat, NakadiRecord.Format.AVRO.getFormat())) { + if (eventFormat.length == 1 && + eventFormat[0] == NakadiRecordMapper.AVRO_FORMAT[0]) { try { - return decoder.deserializeAvro(EnvelopeHolder.fromBytes(data)); + return decoder.deserializeAvroToJsonBytes(EnvelopeHolder.fromBytes(data)); } catch (IOException e) { throw new RuntimeException("failed to deserialize avro event", e); } @@ -35,4 +69,48 @@ public byte[] deserialize(final byte[] eventFormat, final byte[] data) { "event format is not defined, provided format: `%s`", Arrays.toString(eventFormat))); } + + private byte[] deserializeToJsonBytes(final Envelope envelope) { + try { + final Metadata metadata = envelope.getMetadata(); + final Schema schema = schemaService.getAvroSchema( + metadata.getEventType(), metadata.getVersion()); + + final GenericRecord event; + if (envelope.getPayload().array()[0] == AVRO_V1_HEADER[0] && + envelope.getPayload().array()[1] == AVRO_V1_HEADER[1]) { + final BinaryMessageDecoder decoder = BINARY_DECODERS.computeIfAbsent( + schema, (s) -> new BinaryMessageDecoder<>(GenericData.get(), s) + ); + event = decoder.decode(envelope.getPayload()); + } else { + final RawMessageDecoder decoder = RAW_DECODERS.computeIfAbsent( + schema, (s) -> new RawMessageDecoder<>(SpecificData.get(), s) + ); + event = decoder.decode(envelope.getPayload()); + } + final StringBuilder sEvent = new StringBuilder(event.toString()); + final var sanitizedMetadata = mapToJson(metadata).toString(); + + sEvent.deleteCharAt(sEvent.length() - 1) + .append(", \"metadata\":") + .append(sanitizedMetadata).append('}'); + + return sEvent.toString().getBytes(StandardCharsets.UTF_8); + } catch (final IOException io) { + throw new RuntimeException("failed to deserialize avro event", io); + } + } + + private JSONObject mapToJson(final Metadata metadata) { + final JSONObject metadataObj = new JSONObject(metadata.toString()); + final var iterator = metadataObj.keys(); + while (iterator.hasNext()) { + final var key = iterator.next(); + if (metadataObj.get(key).equals(JSONObject.NULL)) { + iterator.remove(); + } + } + return metadataObj; + } } diff --git a/core-services/src/main/java/org/zalando/nakadi/service/publishing/BinaryEventProcessor.java b/core-services/src/main/java/org/zalando/nakadi/service/publishing/BinaryEventProcessor.java index a8f3e32eeb..c03dc6716a 100644 --- a/core-services/src/main/java/org/zalando/nakadi/service/publishing/BinaryEventProcessor.java +++ b/core-services/src/main/java/org/zalando/nakadi/service/publishing/BinaryEventProcessor.java @@ -55,7 +55,7 @@ public void sendEvents(final String etName, final List events) { eventRecordMetadata.stream() .filter(nrr -> nrr.getStatus() != Status.SUCCEEDED) .forEach(nrr -> - LOG.warn("failed to publish events to {} status {} exception {}", + LOG.warn("failed to publish events to {} status {}", etName, nrr.getStatus(), nrr.getException())); } catch (final RuntimeException ex) { LOG.error("failed to send single batch for unknown reason", ex); diff --git a/core-services/src/main/java/org/zalando/nakadi/service/publishing/NakadiKpiPublisher.java b/core-services/src/main/java/org/zalando/nakadi/service/publishing/NakadiKpiPublisher.java index 7003b267cd..eb6b6eae73 100644 --- a/core-services/src/main/java/org/zalando/nakadi/service/publishing/NakadiKpiPublisher.java +++ b/core-services/src/main/java/org/zalando/nakadi/service/publishing/NakadiKpiPublisher.java @@ -7,7 +7,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.zalando.nakadi.domain.Feature; -import org.zalando.nakadi.domain.NakadiAvroMetadata; +import org.zalando.nakadi.domain.NakadiMetadata; import org.zalando.nakadi.domain.NakadiRecord; import org.zalando.nakadi.domain.kpi.AccessLogEvent; import org.zalando.nakadi.domain.kpi.BatchPublishedEvent; @@ -15,6 +15,7 @@ import org.zalando.nakadi.domain.kpi.EventTypeLogEvent; import org.zalando.nakadi.domain.kpi.KPIEvent; import org.zalando.nakadi.domain.kpi.SubscriptionLogEvent; +import org.zalando.nakadi.mapper.NakadiRecordMapper; import org.zalando.nakadi.security.UsernameHasher; import org.zalando.nakadi.service.FeatureToggleService; import org.zalando.nakadi.service.KPIEventMapper; @@ -23,6 +24,7 @@ import org.zalando.nakadi.util.FlowIdUtils; import org.zalando.nakadi.util.UUIDGenerator; +import java.time.Instant; import java.util.Set; import java.util.function.Supplier; @@ -83,8 +85,7 @@ public void publish(final Supplier kpiEventSupplier) { if (featureToggleService.isFeatureEnabled(Feature.AVRO_FOR_KPI_EVENTS)) { final String eventVersion = schemaService.getAvroSchemaVersion( eventTypeName, kpiEvent.getSchema()); - final NakadiAvroMetadata metadata = buildMetaData( - eventTypeName, VERSION_METADATA, eventVersion); + final NakadiMetadata metadata = buildMetadata(eventTypeName, eventVersion); final GenericRecord event = kpiEventMapper.mapToGenericRecord(kpiEvent); final NakadiRecord nakadiRecord = @@ -100,16 +101,10 @@ public void publish(final Supplier kpiEventSupplier) { } } - private NakadiAvroMetadata buildMetaData(final String eventTypeName, - final String metadataVersion, - final String eventVersion) { - final var metaSchemaEntry = localSchemaRegistry - .getEventTypeSchema(LocalSchemaRegistry.METADATA_KEY, VERSION_METADATA); - - final var metadata = new NakadiAvroMetadata( - Byte.parseByte(metadataVersion), - metaSchemaEntry); - metadata.setOccurredAt(System.currentTimeMillis()); + private NakadiMetadata buildMetadata(final String eventTypeName, + final String eventVersion) { + final NakadiMetadata metadata = new NakadiMetadata(); + metadata.setOccurredAt(Instant.now()); metadata.setEid(uuidGenerator.randomUUID().toString()); metadata.setEventType(eventTypeName); metadata.setSchemaVersion(eventVersion); diff --git a/core-services/src/main/java/org/zalando/nakadi/service/publishing/NakadiRecordMapper.java b/core-services/src/main/java/org/zalando/nakadi/service/publishing/NakadiRecordMapper.java deleted file mode 100644 index 64831659d5..0000000000 --- a/core-services/src/main/java/org/zalando/nakadi/service/publishing/NakadiRecordMapper.java +++ /dev/null @@ -1,72 +0,0 @@ -package org.zalando.nakadi.service.publishing; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.EncoderFactory; -import org.springframework.stereotype.Service; -import org.zalando.nakadi.domain.NakadiAvroMetadata; -import org.zalando.nakadi.domain.NakadiRecord; -import org.zalando.nakadi.service.LocalSchemaRegistry; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.LinkedList; -import java.util.List; - -@Service -public class NakadiRecordMapper { - - private final LocalSchemaRegistry localSchemaRegistry; - - public NakadiRecordMapper(final LocalSchemaRegistry localSchemaRegistry) { - this.localSchemaRegistry = localSchemaRegistry; - - } - - public List fromBytesBatch(final byte[] batch) throws IOException { - final List records = new LinkedList<>(); - final ByteBuffer tmp = ByteBuffer.wrap(batch); - while (tmp.hasRemaining()) { - final int recordStart = tmp.position(); - final byte metadataVersion = tmp.get(); - final int metadataLength = tmp.getInt(); - final byte[] metadata = new byte[metadataLength]; - tmp.get(metadata); - - final int payloadLength = tmp.getInt(); - final byte[] payload = new byte[payloadLength]; - tmp.position(recordStart + 1 + 4 + metadataLength + 4); - tmp.get(payload); - - // fixme use version as byte in local registry - final Schema metadataAvroSchema = localSchemaRegistry.getEventTypeSchema( - LocalSchemaRegistry.METADATA_KEY, String.valueOf(metadataVersion)); - final NakadiAvroMetadata nakadiAvroMetadata = new NakadiAvroMetadata( - metadataVersion, metadataAvroSchema, metadata); - - records.add(new NakadiRecord() - .setMetadata(nakadiAvroMetadata) - .setPayload(payload)); - } - - return records; - } - - public NakadiRecord fromAvroGenericRecord(final NakadiAvroMetadata metadata, - final GenericRecord event) throws IOException { - - final var payloadOutputStream = new ByteArrayOutputStream(); - final var eventWriter = new GenericDatumWriter(event.getSchema()); - eventWriter.write(event, EncoderFactory.get() - .directBinaryEncoder(payloadOutputStream, null)); - - return new NakadiRecord() - .setMetadata(metadata) - .setEventKey(null) // fixme remove it once event key implemented - .setPayload(payloadOutputStream.toByteArray()) - .setFormat(NakadiRecord.Format.AVRO.getFormat()); - } - -} diff --git a/core-services/src/main/java/org/zalando/nakadi/service/timeline/MultiTimelineEventConsumer.java b/core-services/src/main/java/org/zalando/nakadi/service/timeline/MultiTimelineEventConsumer.java index 89811eac67..183df70605 100644 --- a/core-services/src/main/java/org/zalando/nakadi/service/timeline/MultiTimelineEventConsumer.java +++ b/core-services/src/main/java/org/zalando/nakadi/service/timeline/MultiTimelineEventConsumer.java @@ -11,6 +11,7 @@ import org.zalando.nakadi.exceptions.runtime.InvalidCursorException; import org.zalando.nakadi.exceptions.runtime.NakadiRuntimeException; import org.zalando.nakadi.exceptions.runtime.ServiceTemporarilyUnavailableException; +import org.zalando.nakadi.mapper.NakadiRecordMapper; import org.zalando.nakadi.repository.EventConsumer; import org.zalando.nakadi.repository.TopicRepository; import org.zalando.nakadi.repository.kafka.KafkaFactory; @@ -68,6 +69,7 @@ public class MultiTimelineEventConsumer implements EventConsumer.ReassignableEve private final Comparator comparator; private final SchemaProviderService schemaService; private final LocalSchemaRegistry localSchemaRegistry; + private final NakadiRecordMapper nakadiRecordMapper; public MultiTimelineEventConsumer( final String clientId, @@ -75,13 +77,15 @@ public MultiTimelineEventConsumer( final TimelineSync timelineSync, final Comparator comparator, final SchemaProviderService schemaService, - final LocalSchemaRegistry localSchemaRegistry) { + final LocalSchemaRegistry localSchemaRegistry, + final NakadiRecordMapper nakadiRecordMapper) { this.clientId = clientId; this.timelineService = timelineService; this.timelineSync = timelineSync; this.comparator = comparator; this.schemaService = schemaService; this.localSchemaRegistry = localSchemaRegistry; + this.nakadiRecordMapper = nakadiRecordMapper; } @Override @@ -258,7 +262,8 @@ private void electTopicRepositories() throws InvalidCursorException { clientId, Arrays.deepToString(entry.getValue().toArray())); final EventConsumer.LowLevelConsumer consumer = repo.createEventConsumer( - clientId, entry.getValue(), new KafkaRecordDeserializer(schemaService, localSchemaRegistry)); + clientId, entry.getValue(), new KafkaRecordDeserializer( + nakadiRecordMapper, schemaService, localSchemaRegistry)); eventConsumers.put(repo, consumer); } } diff --git a/core-services/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java b/core-services/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java index 8b9deb3f64..f85a0a9122 100644 --- a/core-services/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java +++ b/core-services/src/main/java/org/zalando/nakadi/service/timeline/TimelineService.java @@ -39,6 +39,7 @@ import org.zalando.nakadi.exceptions.runtime.TopicDeletionException; import org.zalando.nakadi.exceptions.runtime.TopicRepositoryException; import org.zalando.nakadi.exceptions.runtime.UnableProcessException; +import org.zalando.nakadi.mapper.NakadiRecordMapper; import org.zalando.nakadi.plugin.api.authz.AuthorizationService; import org.zalando.nakadi.repository.EventConsumer; import org.zalando.nakadi.repository.NakadiTopicConfig; @@ -79,6 +80,7 @@ public class TimelineService { // one man said, it is fine to add 11th argument private final SchemaProviderService schemaService; private final LocalSchemaRegistry localSchemaRegistry; + private final NakadiRecordMapper nakadiRecordMapper; @Autowired public TimelineService(final EventTypeCache eventTypeCache, @@ -92,7 +94,8 @@ public TimelineService(final EventTypeCache eventTypeCache, final FeatureToggleService featureToggleService, @Value("${nakadi.timelines.storage.compacted}") final String compactedStorageName, final SchemaProviderService schemaService, - final LocalSchemaRegistry localSchemaRegistry) { + final LocalSchemaRegistry localSchemaRegistry, + final NakadiRecordMapper nakadiRecordMapper) { this.eventTypeCache = eventTypeCache; this.storageDbRepository = storageDbRepository; this.timelineSync = timelineSync; @@ -105,6 +108,7 @@ public TimelineService(final EventTypeCache eventTypeCache, this.compactedStorageName = compactedStorageName; this.schemaService = schemaService; this.localSchemaRegistry = localSchemaRegistry; + this.nakadiRecordMapper = nakadiRecordMapper; } public Timeline createTimeline(final String eventTypeName, final String storageId) @@ -286,7 +290,7 @@ public EventConsumer createEventConsumer(@Nullable final String clientId, final throws InvalidCursorException { final MultiTimelineEventConsumer result = new MultiTimelineEventConsumer( clientId, this, timelineSync, - new NakadiCursorComparator(eventTypeCache), schemaService, localSchemaRegistry); + new NakadiCursorComparator(eventTypeCache), schemaService, localSchemaRegistry, nakadiRecordMapper); result.reassign(positions); return result; } @@ -294,7 +298,7 @@ public EventConsumer createEventConsumer(@Nullable final String clientId, final public EventConsumer.ReassignableEventConsumer createEventConsumer(@Nullable final String clientId) { return new MultiTimelineEventConsumer( clientId, this, timelineSync, - new NakadiCursorComparator(eventTypeCache), schemaService, localSchemaRegistry); + new NakadiCursorComparator(eventTypeCache), schemaService, localSchemaRegistry, nakadiRecordMapper); } private void switchTimelines(final Timeline activeTimeline, final Timeline nextTimeline) diff --git a/core-services/src/test/java/org/zalando/nakadi/enrichment/MetadataEnrichmentStrategyTest.java b/core-services/src/test/java/org/zalando/nakadi/enrichment/MetadataEnrichmentStrategyTest.java index 986807f214..bdb5a13954 100644 --- a/core-services/src/test/java/org/zalando/nakadi/enrichment/MetadataEnrichmentStrategyTest.java +++ b/core-services/src/test/java/org/zalando/nakadi/enrichment/MetadataEnrichmentStrategyTest.java @@ -1,25 +1,24 @@ package org.zalando.nakadi.enrichment; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.avro.AvroMapper; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.joda.time.DateTimeUtils; import org.json.JSONObject; import org.junit.Test; import org.mockito.Mockito; -import org.springframework.core.io.DefaultResourceLoader; import org.zalando.nakadi.domain.BatchItem; import org.zalando.nakadi.domain.EventType; -import org.zalando.nakadi.domain.NakadiAvroMetadata; +import org.zalando.nakadi.domain.NakadiMetadata; import org.zalando.nakadi.domain.NakadiRecord; import org.zalando.nakadi.exceptions.runtime.EnrichmentException; +import org.zalando.nakadi.mapper.NakadiRecordMapper; import org.zalando.nakadi.plugin.api.authz.AuthorizationService; import org.zalando.nakadi.service.LocalSchemaRegistry; -import org.zalando.nakadi.service.publishing.NakadiRecordMapper; import org.zalando.nakadi.util.FlowIdUtils; +import org.zalando.nakadi.utils.TestUtils; import java.io.IOException; +import java.time.Instant; import java.util.Optional; import java.util.UUID; @@ -41,8 +40,7 @@ public class MetadataEnrichmentStrategyTest { private LocalSchemaRegistry localSchemaRegistry; public MetadataEnrichmentStrategyTest() throws IOException { - this.localSchemaRegistry = new LocalSchemaRegistry(new AvroMapper(), new ObjectMapper(), - new DefaultResourceLoader().getResource("event-type-schema/")); + this.localSchemaRegistry = TestUtils.getLocalSchemaRegistry(); } @Test @@ -199,12 +197,12 @@ public void testNakadiRecordEnrichment() throws IOException { private NakadiRecord getTestNakadiRecord() throws IOException { - final long now = System.currentTimeMillis(); + final Instant now = Instant.now(); final var nakadiAccessLog = "nakadi.access.log"; final var latestMeta = localSchemaRegistry.getLatestEventTypeSchemaVersion(LocalSchemaRegistry.METADATA_KEY); final var latestSchema = localSchemaRegistry.getLatestEventTypeSchemaVersion(nakadiAccessLog); - final var nakadiAvroMetadata = new NakadiAvroMetadata(latestMeta.getVersionAsByte(), latestMeta.getSchema()); + final var nakadiAvroMetadata = new NakadiMetadata(); nakadiAvroMetadata.setOccurredAt(now); nakadiAvroMetadata.setEid(UUID.randomUUID().toString()); nakadiAvroMetadata.setFlowId("test-flow"); diff --git a/core-services/src/test/java/org/zalando/nakadi/repository/kafka/KafkaRecordDeserializerTest.java b/core-services/src/test/java/org/zalando/nakadi/repository/kafka/KafkaRecordDeserializerTest.java index 1e07a449be..2e054ae2ee 100644 --- a/core-services/src/test/java/org/zalando/nakadi/repository/kafka/KafkaRecordDeserializerTest.java +++ b/core-services/src/test/java/org/zalando/nakadi/repository/kafka/KafkaRecordDeserializerTest.java @@ -1,25 +1,32 @@ package org.zalando.nakadi.repository.kafka; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.avro.AvroMapper; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.io.EncoderFactory; +import org.apache.avro.message.BinaryMessageEncoder; import org.json.JSONObject; import org.junit.Assert; import org.junit.Test; import org.springframework.core.io.DefaultResourceLoader; import org.springframework.core.io.Resource; import org.zalando.nakadi.domain.EnvelopeHolder; -import org.zalando.nakadi.domain.NakadiRecord; import org.zalando.nakadi.domain.VersionedAvroSchema; +import org.zalando.nakadi.generated.avro.Envelope; +import org.zalando.nakadi.generated.avro.Metadata; +import org.zalando.nakadi.mapper.NakadiRecordMapper; import org.zalando.nakadi.service.LocalSchemaRegistry; import org.zalando.nakadi.service.SchemaProviderService; import org.zalando.nakadi.service.TestSchemaProviderService; +import org.zalando.nakadi.util.AvroUtils; +import org.zalando.nakadi.utils.TestUtils; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Instant; import java.util.Optional; public class KafkaRecordDeserializerTest { @@ -28,19 +35,37 @@ public class KafkaRecordDeserializerTest { private static final String SOME_TIME_DATE_STRING = "2022-01-27T13:30:32.172Z"; private final LocalSchemaRegistry localSchemaRegistry; private final SchemaProviderService schemaService; - private VersionedAvroSchema metadataSchema; + private final VersionedAvroSchema metadataSchema; + private final NakadiRecordMapper nakadiRecordMapper; + private final Schema schema = AvroUtils.getParsedSchema(new DefaultResourceLoader() + .getResource("test.deserialize.avro.avsc").getInputStream()); + private final KafkaRecordDeserializer deserializer; public KafkaRecordDeserializerTest() throws IOException { // FIXME: doesn't work without the trailing slash - final Resource eventTypeRes = new DefaultResourceLoader().getResource("event-type-schema/"); - localSchemaRegistry = new LocalSchemaRegistry(new AvroMapper(), new ObjectMapper(), eventTypeRes); + final Resource eventTypeRes = new DefaultResourceLoader().getResource("avro-schema/"); + localSchemaRegistry = TestUtils.getLocalSchemaRegistry(); schemaService = new TestSchemaProviderService(localSchemaRegistry); metadataSchema = localSchemaRegistry.getLatestEventTypeSchemaVersion(LocalSchemaRegistry.METADATA_KEY); + nakadiRecordMapper = new NakadiRecordMapper(localSchemaRegistry); + final SchemaProviderService singleSchemaProvider = new SchemaProviderService() { + @Override + public Schema getAvroSchema(final String etName, final String version) { + return schema; + } + + @Override + public String getAvroSchemaVersion(final String etName, final Schema schema) { + return null; + } + }; + deserializer = new KafkaRecordDeserializer(nakadiRecordMapper, singleSchemaProvider, localSchemaRegistry); } @Test public void testDeserializeAvro() throws IOException { - final KafkaRecordDeserializer deserializer = new KafkaRecordDeserializer(schemaService, localSchemaRegistry); + final KafkaRecordDeserializer deserializer = new KafkaRecordDeserializer( + nakadiRecordMapper, schemaService, localSchemaRegistry); final JSONObject jsonObject = new JSONObject() .put("flow_id", "hek") @@ -59,12 +84,12 @@ public void testDeserializeAvro() throws IOException { getEventWriter2()); // try to deserialize that data when we would read Kafka record - final byte[] deserializedEvent0 = deserializer.deserialize( - NakadiRecord.Format.AVRO.getFormat(), + final byte[] deserializedEvent0 = deserializer.deserializeToJsonBytes( + NakadiRecordMapper.AVRO_FORMAT, data0 ); - final byte[] deserializedEvent1 = deserializer.deserialize( - NakadiRecord.Format.AVRO.getFormat(), + final byte[] deserializedEvent1 = deserializer.deserializeToJsonBytes( + NakadiRecordMapper.AVRO_FORMAT, data1 ); @@ -75,6 +100,14 @@ public void testDeserializeAvro() throws IOException { getExpectedNode2().similar(new JSONObject(new String(deserializedEvent1)))); } + @Test + public void testDeserializeAvroNullEventInLogCompactedEventType() { + final KafkaRecordDeserializer deserializer = new KafkaRecordDeserializer( + nakadiRecordMapper, schemaService, localSchemaRegistry); + + Assert.assertNull(deserializer.deserializeToJsonBytes(null, null)); + } + @Test public void testDeserializeAvroMetadata0() throws IOException { final JSONObject jsonObject = new JSONObject() @@ -87,9 +120,72 @@ public void testDeserializeAvroMetadata0() throws IOException { Assert.assertTrue(expectedJson.similar(actualJson)); } + @Test + public void testDeserializeAvroEnvelope() throws IOException { + final ByteArrayOutputStream payload = new ByteArrayOutputStream(); + new GenericDatumWriter<>(schema).write( + new GenericRecordBuilder(schema).set("foo", "bar").build(), + EncoderFactory.get().directBinaryEncoder(payload, null)); + + final String expectedOccurredAt = "2022-06-15T15:17:00Z"; + final Instant now = Instant.parse(expectedOccurredAt); + final Envelope envelope = Envelope.newBuilder() + .setMetadata(Metadata.newBuilder() + .setEid("4623130E-2983-4134-A472-F35154CFF980") + .setEventOwner("nakadi") + .setFlowId("xxx-event-flow-id") + .setOccurredAt(now) + .setEventType("nakadi-test-event-type") + .setVersion("1.0.0") + .build()) + .setPayload(ByteBuffer.wrap(payload.toByteArray())) + .build(); + + final ByteBuffer byteBuffer = Envelope.getEncoder().encode(envelope); + final byte[] jsonBytes = deserializer.deserializeToJsonBytes(null, byteBuffer.array()); + + final JSONObject event = new JSONObject(new String(jsonBytes)); + Assert.assertEquals("bar", event.get("foo")); + Assert.assertEquals("4623130E-2983-4134-A472-F35154CFF980", + event.getJSONObject("metadata").get("eid")); + Assert.assertEquals(expectedOccurredAt, + event.getJSONObject("metadata").get("occurred_at")); + } + + @Test + public void testDeserializeAvroSingleObjectEncoding() throws IOException { + final ByteBuffer payload = new BinaryMessageEncoder(GenericData.get(), schema) + .encode(new GenericRecordBuilder(schema).set("foo", "bar").build()); + + final String expectedOccurredAt = "2022-06-15T15:17:00Z"; + final Instant now = Instant.parse(expectedOccurredAt); + final Envelope envelope = Envelope.newBuilder() + .setMetadata(Metadata.newBuilder() + .setEid("4623130E-2983-4134-A472-F35154CFF980") + .setEventOwner("nakadi") + .setFlowId("xxx-event-flow-id") + .setOccurredAt(now) + .setEventType("nakadi-test-event-type") + .setVersion("1.0.0") + .build()) + .setPayload(ByteBuffer.wrap(payload.array())) + .build(); + + final ByteBuffer byteBuffer = Envelope.getEncoder().encode(envelope); + final byte[] jsonBytes = deserializer.deserializeToJsonBytes(null, byteBuffer.array()); + + final JSONObject event = new JSONObject(new String(jsonBytes)); + Assert.assertEquals("bar", event.get("foo")); + Assert.assertEquals("4623130E-2983-4134-A472-F35154CFF980", + event.getJSONObject("metadata").get("eid")); + Assert.assertEquals(expectedOccurredAt, + event.getJSONObject("metadata").get("occurred_at")); + } + private JSONObject getSerializedJsonObject(final VersionedAvroSchema metadataVersion, final JSONObject metadataOverride) throws IOException { - final KafkaRecordDeserializer deserializer = new KafkaRecordDeserializer(schemaService, localSchemaRegistry); + final KafkaRecordDeserializer deserializer = new KafkaRecordDeserializer( + nakadiRecordMapper, schemaService, localSchemaRegistry); final var eventWriter = getEventWriter1(); @@ -100,8 +196,8 @@ private JSONObject getSerializedJsonObject(final VersionedAvroSchema metadataVer eventWriter); // try to deserialize that data when we would read Kafka record - final byte[] deserializedEvent = deserializer.deserialize( - NakadiRecord.Format.AVRO.getFormat(), + final byte[] deserializedEvent = deserializer.deserializeToJsonBytes( + NakadiRecordMapper.AVRO_FORMAT, data ); diff --git a/core-services/src/test/java/org/zalando/nakadi/service/CursorOperationsServiceTest.java b/core-services/src/test/java/org/zalando/nakadi/service/CursorOperationsServiceTest.java index bcf585a26e..206e4ddd46 100644 --- a/core-services/src/test/java/org/zalando/nakadi/service/CursorOperationsServiceTest.java +++ b/core-services/src/test/java/org/zalando/nakadi/service/CursorOperationsServiceTest.java @@ -1,6 +1,5 @@ package org.zalando.nakadi.service; -import com.codahale.metrics.MetricRegistry; import com.google.common.collect.Lists; import org.hamcrest.CoreMatchers; import org.junit.Test; @@ -17,7 +16,6 @@ import org.zalando.nakadi.repository.kafka.KafkaTopicConfigFactory; import org.zalando.nakadi.repository.kafka.KafkaTopicRepository; import org.zalando.nakadi.repository.kafka.KafkaZookeeper; -import org.zalando.nakadi.repository.zookeeper.ZookeeperSettings; import org.zalando.nakadi.service.timeline.TimelineService; import javax.annotation.Nullable; @@ -316,10 +314,8 @@ private Timeline mockTimeline(final int order, @Nullable final Long latestOffset .setKafkaFactory(Mockito.mock(KafkaFactory.class)) .setNakadiSettings(Mockito.mock(NakadiSettings.class)) .setKafkaSettings(Mockito.mock(KafkaSettings.class)) - .setZookeeperSettings(Mockito.mock(ZookeeperSettings.class)) .setKafkaTopicConfigFactory(Mockito.mock(KafkaTopicConfigFactory.class)) .setKafkaLocationManager(Mockito.mock(KafkaLocationManager.class)) - .setMetricRegistry(new MetricRegistry()) .build(); Mockito.when(timelineService.getTopicRepository(timeline)).thenReturn(repository); diff --git a/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventOwnerExtractorTest.java b/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventOwnerExtractorTest.java index 69160ebd04..0290e5dba4 100644 --- a/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventOwnerExtractorTest.java +++ b/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventOwnerExtractorTest.java @@ -4,7 +4,6 @@ import org.junit.Assert; import org.junit.Test; import org.zalando.nakadi.domain.EventOwnerHeader; -import org.zalando.nakadi.domain.NakadiAvroMetadata; import org.zalando.nakadi.domain.NakadiMetadata; import org.zalando.nakadi.domain.StrictJsonParser; @@ -14,7 +13,7 @@ public class EventOwnerExtractorTest { private static final JSONObject MOCK_EVENT = StrictJsonParser.parse("{" + "\"other\": null, \n" + "\"example\": {\n" + - "\"security\": {\"final\": \"test_value\"}}" + + "\"security\": {\"final\": \"test_value\"}}" + "}", false); @Test @@ -55,7 +54,7 @@ public void testCorrectValueSetWhenStatic() { @Test public void testCorrectValueSetWhenMetadata() throws IOException { - final NakadiMetadata metadata = new NakadiAvroMetadata((byte) 1, null); + final NakadiMetadata metadata = new NakadiMetadata(); metadata.setEventOwner("owner-123"); final EventOwnerExtractor extractor = EventOwnerExtractorFactory.createMetadataExtractor("retailer_id"); diff --git a/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventPublisherTest.java b/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventPublisherTest.java index c67ca9a330..cd1f9c306d 100644 --- a/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventPublisherTest.java +++ b/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventPublisherTest.java @@ -1,7 +1,5 @@ package org.zalando.nakadi.service.publishing; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.avro.AvroMapper; import com.google.common.collect.ImmutableList; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; @@ -23,7 +21,6 @@ import org.zalando.nakadi.domain.EventPublishingStep; import org.zalando.nakadi.domain.EventType; import org.zalando.nakadi.domain.EventTypeBase; -import org.zalando.nakadi.domain.NakadiAvroMetadata; import org.zalando.nakadi.domain.NakadiMetadata; import org.zalando.nakadi.domain.NakadiRecord; import org.zalando.nakadi.domain.Timeline; @@ -33,6 +30,7 @@ import org.zalando.nakadi.exceptions.runtime.EventPublishingException; import org.zalando.nakadi.exceptions.runtime.EventTypeTimeoutException; import org.zalando.nakadi.exceptions.runtime.PartitioningException; +import org.zalando.nakadi.mapper.NakadiRecordMapper; import org.zalando.nakadi.partitioning.PartitionResolver; import org.zalando.nakadi.partitioning.PartitionStrategy; import org.zalando.nakadi.plugin.api.authz.Resource; @@ -47,6 +45,7 @@ import org.zalando.nakadi.validation.ValidationError; import java.io.Closeable; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -593,9 +592,8 @@ public void testWrite() throws Exception { @Test public void testAvroEventWasSerialized() throws Exception { final org.springframework.core.io.Resource eventTypeRes = - new DefaultResourceLoader().getResource("event-type-schema/"); - final LocalSchemaRegistry localSchemaRegistry = new LocalSchemaRegistry( - new AvroMapper(), new ObjectMapper(), eventTypeRes); + new DefaultResourceLoader().getResource("avro-schema/"); + final LocalSchemaRegistry localSchemaRegistry = new LocalSchemaRegistry(eventTypeRes); final BinaryEventPublisher eventPublisher = new BinaryEventPublisher( timelineService, timelineSync, nakadiSettings); final EventType eventType = buildDefaultEventType(); @@ -607,15 +605,14 @@ public void testAvroEventWasSerialized() throws Exception { Mockito.when(partitionResolver.resolvePartition(any(EventType.class), any(NakadiMetadata.class))) .thenReturn("1"); - final long now = System.currentTimeMillis(); + final Instant now = Instant.now(); final var latestMeta = localSchemaRegistry.getLatestEventTypeSchemaVersion(LocalSchemaRegistry.METADATA_KEY); final var latestSchema = localSchemaRegistry.getLatestEventTypeSchemaVersion("nakadi.access.log"); - final NakadiAvroMetadata metadata = new NakadiAvroMetadata( - latestMeta.getVersionAsByte(), latestMeta.getSchema()); + final NakadiMetadata metadata = new NakadiMetadata(); metadata.setOccurredAt(now); metadata.setEid("9702cf96-9bdb-48b7-9f4c-92643cb6d9fc"); metadata.setFlowId(FlowIdUtils.peek()); diff --git a/core-services/src/test/java/org/zalando/nakadi/service/publishing/NakadiKpiPublisherTest.java b/core-services/src/test/java/org/zalando/nakadi/service/publishing/NakadiKpiPublisherTest.java index ebbd3da668..3e26117113 100644 --- a/core-services/src/test/java/org/zalando/nakadi/service/publishing/NakadiKpiPublisherTest.java +++ b/core-services/src/test/java/org/zalando/nakadi/service/publishing/NakadiKpiPublisherTest.java @@ -1,7 +1,5 @@ package org.zalando.nakadi.service.publishing; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.avro.AvroMapper; import org.json.JSONObject; import org.junit.Test; import org.junit.runner.RunWith; @@ -17,14 +15,16 @@ import org.zalando.nakadi.domain.Timeline; import org.zalando.nakadi.domain.kpi.KPIEvent; import org.zalando.nakadi.domain.kpi.SubscriptionLogEvent; +import org.zalando.nakadi.mapper.NakadiRecordMapper; import org.zalando.nakadi.repository.TopicRepository; import org.zalando.nakadi.repository.kafka.SequenceDecoder; import org.zalando.nakadi.security.UsernameHasher; -import org.zalando.nakadi.service.LocalSchemaRegistry; import org.zalando.nakadi.service.FeatureToggleService; +import org.zalando.nakadi.service.LocalSchemaRegistry; import org.zalando.nakadi.service.SchemaProviderService; import org.zalando.nakadi.service.TestSchemaProviderService; import org.zalando.nakadi.util.UUIDGenerator; +import org.zalando.nakadi.utils.TestUtils; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -50,7 +50,7 @@ public class NakadiKpiPublisherTest { private final SchemaProviderService schemaProviderService = new TestSchemaProviderService(localRegistryMock); private final UUIDGenerator uuidGenerator = Mockito.mock(UUIDGenerator.class); private final UsernameHasher usernameHasher = new UsernameHasher("123"); - private final NakadiRecordMapper recordMapper = new NakadiRecordMapper(localRegistryMock); + private final NakadiRecordMapper recordMapper; @Captor private ArgumentCaptor eventTypeCaptor; @@ -59,6 +59,10 @@ public class NakadiKpiPublisherTest { @Captor private ArgumentCaptor jsonObjectCaptor; + public NakadiKpiPublisherTest() throws IOException { + this.recordMapper = TestUtils.getNakadiRecordMapper(); + } + @Test public void testPublishJsonKPIEventWithFeatureToggleOn() { when(featureToggleService.isFeatureEnabled(Feature.KPI_COLLECTION)).thenReturn(true); @@ -95,8 +99,8 @@ public void testPublishAvroKPIEventWithFeatureToggleOn() throws IOException { when(featureToggleService.isFeatureEnabled(Feature.AVRO_FOR_KPI_EVENTS)).thenReturn(true); // Publish the above KPIEvent and capture it. - final Resource eventTypeRes = new DefaultResourceLoader().getResource("event-type-schema/"); - final var localRegistry = new LocalSchemaRegistry(new AvroMapper(), new ObjectMapper(), eventTypeRes); + final Resource eventTypeRes = new DefaultResourceLoader().getResource("avro-schema/"); + final var localRegistry = new LocalSchemaRegistry(eventTypeRes); new NakadiKpiPublisher(featureToggleService, jsonProcessor, binaryProcessor, usernameHasher, new EventMetadataTestStub(), new UUIDGenerator(), new TestSchemaProviderService(localRegistry), localRegistryMock, recordMapper) diff --git a/core-services/src/test/java/org/zalando/nakadi/service/publishing/NakadiRecordMapperTest.java b/core-services/src/test/java/org/zalando/nakadi/service/publishing/NakadiRecordMapperTest.java deleted file mode 100644 index 6da57b02fb..0000000000 --- a/core-services/src/test/java/org/zalando/nakadi/service/publishing/NakadiRecordMapperTest.java +++ /dev/null @@ -1,83 +0,0 @@ -package org.zalando.nakadi.service.publishing; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.avro.AvroMapper; -import com.google.common.primitives.Bytes; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.EncoderFactory; -import org.apache.commons.lang.ArrayUtils; -import org.junit.Assert; -import org.junit.Test; -import org.springframework.core.io.DefaultResourceLoader; -import org.zalando.nakadi.domain.NakadiRecord; -import org.zalando.nakadi.domain.VersionedAvroSchema; -import org.zalando.nakadi.service.LocalSchemaRegistry; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.UUID; - -public class NakadiRecordMapperTest { - - @Test - public void testFromBytesBatch() throws IOException { - final var eventTypeRes = new DefaultResourceLoader().getResource("event-type-schema/"); - final LocalSchemaRegistry localSchemaRegistry = - new LocalSchemaRegistry(new AvroMapper(), new ObjectMapper(), eventTypeRes); - final var eid1 = UUID.randomUUID().toString(); - final var message1 = "First record for testing !!!"; - final var firstRecord = generateRecord(localSchemaRegistry, eid1, message1); - final var eid2 = UUID.randomUUID().toString(); - final var message2 = "*** Testing twice ***"; - final var secondRecord = generateRecord(localSchemaRegistry, eid2, message2); - final byte[] input = Bytes.concat(firstRecord, secondRecord); - - final NakadiRecordMapper mapper = new NakadiRecordMapper(localSchemaRegistry); - final List records = mapper.fromBytesBatch(input); - - Assert.assertEquals(2, records.size()); - - Assert.assertNotNull(records.get(0).getMetadata()); - Assert.assertEquals(eid1, records.get(0).getMetadata().getEid()); - Assert.assertNotNull(records.get(1).getMetadata()); - Assert.assertEquals(eid2, records.get(1).getMetadata().getEid()); - - Assert.assertEquals(message1, new String(records.get(0).getPayload())); - Assert.assertEquals(message2, new String(records.get(1).getPayload())); - } - - private byte[] generateRecord(final LocalSchemaRegistry localSchemaRegistry, final String eid, final String payload) - throws IOException { - final VersionedAvroSchema versionedSchema = - localSchemaRegistry.getLatestEventTypeSchemaVersion(LocalSchemaRegistry.METADATA_KEY); - final GenericRecord metadata = - new GenericData.Record(versionedSchema.getSchema()); - - final long someEqualTime = 1643290232172l; - metadata.put("occurred_at", someEqualTime); - metadata.put("eid", eid); - metadata.put("flow_id", "hek"); - metadata.put("event_type", "nakadi.access.log"); - metadata.put("partition", "0"); - metadata.put("received_at", someEqualTime); - metadata.put("version", "schemaVersion"); - metadata.put("published_by", "nakadi-test"); - - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - final GenericDatumWriter eventWriter = new GenericDatumWriter(metadata.getSchema()); - eventWriter.write(metadata, EncoderFactory.get() - .directBinaryEncoder(baos, null)); - - final var meta = baos.toByteArray(); - final var metadataBytes = ArrayUtils.addAll( - ByteBuffer.allocate(4).putInt(meta.length).array(), meta); - final var payloadBytes = ArrayUtils.addAll( - ByteBuffer.allocate(4).putInt(payload.length()).array(), payload.getBytes()); - return Bytes.concat(new byte[]{versionedSchema.getVersionAsByte()}, metadataBytes, payloadBytes); - } - -} diff --git a/core-services/src/test/java/org/zalando/nakadi/service/publishing/check/CheckTest.java b/core-services/src/test/java/org/zalando/nakadi/service/publishing/check/CheckTest.java index 6dd0c79b6f..316bc7962f 100644 --- a/core-services/src/test/java/org/zalando/nakadi/service/publishing/check/CheckTest.java +++ b/core-services/src/test/java/org/zalando/nakadi/service/publishing/check/CheckTest.java @@ -4,7 +4,7 @@ import org.junit.Assert; import org.junit.Test; import org.zalando.nakadi.domain.EventType; -import org.zalando.nakadi.domain.NakadiAvroMetadata; +import org.zalando.nakadi.domain.NakadiMetadata; import org.zalando.nakadi.domain.NakadiRecord; import org.zalando.nakadi.domain.NakadiRecordResult; @@ -49,8 +49,8 @@ public NakadiRecordResult.Step getCurrentStep() { } }; - private NakadiAvroMetadata getTestMetadata() { - final NakadiAvroMetadata nakadiMetadata = new NakadiAvroMetadata((byte) 0, null); + private NakadiMetadata getTestMetadata() { + final NakadiMetadata nakadiMetadata = new NakadiMetadata(); nakadiMetadata.setEid("12345"); return nakadiMetadata; } diff --git a/core-services/src/test/java/org/zalando/nakadi/service/publishing/check/EnrichmentCheckTest.java b/core-services/src/test/java/org/zalando/nakadi/service/publishing/check/EnrichmentCheckTest.java index 31a56a8173..b28c761dd3 100644 --- a/core-services/src/test/java/org/zalando/nakadi/service/publishing/check/EnrichmentCheckTest.java +++ b/core-services/src/test/java/org/zalando/nakadi/service/publishing/check/EnrichmentCheckTest.java @@ -8,7 +8,7 @@ import org.mockito.junit.MockitoJUnitRunner; import org.zalando.nakadi.domain.EnrichmentStrategyDescriptor; import org.zalando.nakadi.domain.EventType; -import org.zalando.nakadi.domain.NakadiAvroMetadata; +import org.zalando.nakadi.domain.NakadiMetadata; import org.zalando.nakadi.domain.NakadiRecord; import org.zalando.nakadi.domain.NakadiRecordResult; import org.zalando.nakadi.enrichment.EnrichmentsRegistry; @@ -58,7 +58,7 @@ public void testExecuteError() { .thenReturn(List.of(EnrichmentStrategyDescriptor.METADATA_ENRICHMENT)); final NakadiRecord nakadiRecord = Mockito.mock(NakadiRecord.class); final List nakadiRecords = List.of(nakadiRecord); - final NakadiAvroMetadata metadata = Mockito.mock(NakadiAvroMetadata.class); + final NakadiMetadata metadata = Mockito.mock(NakadiMetadata.class); Mockito.when(nakadiRecord.getMetadata()).thenReturn(metadata); Mockito.doThrow(EnrichmentException.class).when(metadataEnrichmentStrategy).enrich(nakadiRecord, eventType); final List recordResults = enrichmentCheck.execute(eventType, nakadiRecords); diff --git a/core-services/src/test/java/org/zalando/nakadi/service/timeline/TimelineServiceTest.java b/core-services/src/test/java/org/zalando/nakadi/service/timeline/TimelineServiceTest.java index 7309882555..fe2950278b 100644 --- a/core-services/src/test/java/org/zalando/nakadi/service/timeline/TimelineServiceTest.java +++ b/core-services/src/test/java/org/zalando/nakadi/service/timeline/TimelineServiceTest.java @@ -31,7 +31,9 @@ import org.zalando.nakadi.service.LocalSchemaRegistry; import org.zalando.nakadi.service.TestSchemaProviderService; import org.zalando.nakadi.utils.EventTypeTestBuilder; +import org.zalando.nakadi.utils.TestUtils; +import java.io.IOException; import java.util.Collections; import java.util.Date; import java.util.List; @@ -65,12 +67,13 @@ public class TimelineServiceTest { private TimelineService timelineService; @Before - public void setupService() { + public void setupService() throws IOException { timelineService = new TimelineService(eventTypeCache, storageDbRepository, mock(TimelineSync.class), mock(NakadiSettings.class), timelineDbRepository, topicRepositoryHolder, new TransactionTemplate(mock(PlatformTransactionManager.class)), adminService, featureToggleService, "compacted-storage", - new TestSchemaProviderService(localSchemaRegistry), localSchemaRegistry); + new TestSchemaProviderService(localSchemaRegistry), localSchemaRegistry, + TestUtils.getNakadiRecordMapper()); } @Test(expected = NotFoundException.class) diff --git a/core-services/src/test/resources/test.deserialize.avro.avsc b/core-services/src/test/resources/test.deserialize.avro.avsc new file mode 100644 index 0000000000..6c65cc481e --- /dev/null +++ b/core-services/src/test/resources/test.deserialize.avro.avsc @@ -0,0 +1,10 @@ +{ + "type": "record", + "name": "testRecord", + "fields": [ + { + "type": "string", + "name": "foo" + } + ] +} \ No newline at end of file