Skip to content

Commit

Permalink
BE: Closes kafbat#71 Messages: Show headers duplicates
Browse files Browse the repository at this point in the history
  • Loading branch information
PopClom committed Oct 28, 2024
1 parent 0ad8695 commit 58b7833
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 20 deletions.
2 changes: 1 addition & 1 deletion api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ private static CelCompiler createCompiler() {
"timestampMs", SimpleType.INT,
"keyAsText", SimpleType.STRING,
"valueAsText", SimpleType.STRING,
"headers", MapType.create(SimpleType.STRING, SimpleType.STRING),
"headers", MapType.create(SimpleType.STRING, SimpleType.DYN),
"key", SimpleType.DYN,
"value", SimpleType.DYN
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.UnaryOperator;
import lombok.RequiredArgsConstructor;
Expand Down Expand Up @@ -63,14 +65,16 @@ private static TimestampTypeEnum mapToTimestampType(TimestampType timestampType)
}

private void fillHeaders(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec) {
Map<String, String> headers = new HashMap<>();
Map<String, List<String>> headersMap = new HashMap<>();
rec.headers().iterator()
.forEachRemaining(header ->
headers.put(
header.key(),
header.value() != null ? new String(header.value()) : null
));
message.setHeaders(headers);
.forEachRemaining(header -> {
String key = header.key();
String value = header.value() != null ? new String(header.value()) : null;
headersMap.computeIfAbsent(key, k -> new ArrayList<>()).add(value);
});
Map<String, Object> finalHeadersMap = new HashMap<>();
headersMap.forEach((key, values) -> finalHeadersMap.put(key, values.size() == 1 ? values.get(0) : values));
message.setHeaders(finalHeadersMap);
}

private void fillKey(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec) {
Expand Down
21 changes: 18 additions & 3 deletions api/src/main/java/io/kafbat/ui/serdes/ProducerRecordCreator.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.kafbat.ui.serdes;

import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.serde.api.Serde;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
Expand All @@ -19,7 +21,7 @@ public ProducerRecord<byte[], byte[]> create(String topic,
@Nullable Integer partition,
@Nullable String key,
@Nullable String value,
@Nullable Map<String, String> headers) {
@Nullable Map<String, Object> headers) {
return new ProducerRecord<>(
topic,
partition,
Expand All @@ -29,10 +31,23 @@ public ProducerRecord<byte[], byte[]> create(String topic,
);
}

private Iterable<Header> createHeaders(Map<String, String> clientHeaders) {
private Iterable<Header> createHeaders(Map<String, Object> clientHeaders) {
RecordHeaders headers = new RecordHeaders();
clientHeaders.forEach((k, v) -> headers.add(new RecordHeader(k, v.getBytes())));
clientHeaders.forEach((k, v) -> {
if (v instanceof List<?> valueList) {
valueList.forEach(value -> headers.add(new RecordHeader(k, valueToBytes(value))));
} else {
headers.add(new RecordHeader(k, valueToBytes(v)));
}
});
return headers;
}

private byte[] valueToBytes(Object value) {
if (value instanceof List<?> || value instanceof Map<?, ?>) {
throw new ValidationException("Header values can only be string or list of strings");
}
return value != null ? String.valueOf(value).getBytes() : null;
}

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package io.kafbat.ui.serdes;

import static io.kafbat.ui.serde.api.DeserializeResult.Type.STRING;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.kafbat.ui.model.TopicMessageDTO;
import io.kafbat.ui.serde.api.DeserializeResult;
import io.kafbat.ui.serde.api.Serde;
import java.util.List;
import java.util.Map;
import java.util.function.UnaryOperator;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -22,9 +26,51 @@ void dataMaskingAppliedOnDeserializedMessage() {
Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of());

var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock);
recordDeser.deserialize(new ConsumerRecord<>("t", 1, 1L, Bytes.wrap("t".getBytes()), Bytes.wrap("t".getBytes())));
recordDeser.deserialize(record());

verify(maskerMock).apply(any(TopicMessageDTO.class));
}

@Test
void deserializeWithMultipleHeaderValues() {
UnaryOperator<TopicMessageDTO> maskerMock = mock();
when(maskerMock.apply(any(TopicMessageDTO.class))).thenAnswer(invocation -> invocation.getArgument(0));
Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of());

var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock);
ConsumerRecord<Bytes, Bytes> record = record();
record.headers().add("headerKey", "headerValue1".getBytes());
record.headers().add("headerKey", "headerValue2".getBytes());
TopicMessageDTO message = recordDeser.deserialize(record);

Map<String, Object> headers = message.getHeaders();
assertEquals(1, headers.size());
assertTrue(headers.get("headerKey") instanceof List);
assertEquals(List.of("headerValue1", "headerValue2"), headers.get("headerKey"));
}

@Test
void deserializeWithMixedSingleAndMultipleHeaderValues() {
UnaryOperator<TopicMessageDTO> maskerMock = mock();
when(maskerMock.apply(any(TopicMessageDTO.class))).thenAnswer(invocation -> invocation.getArgument(0));
Serde.Deserializer deser = (headers, data) -> new DeserializeResult("test", STRING, Map.of());

var recordDeser = new ConsumerRecordDeserializer("test", deser, "test", deser, "test", deser, deser, maskerMock);
ConsumerRecord<Bytes, Bytes> record = record();
record.headers().add("headerKey1", "singleValue".getBytes());
record.headers().add("headerKey2", "multiValue1".getBytes());
record.headers().add("headerKey2", "multiValue2".getBytes());
TopicMessageDTO message = recordDeser.deserialize(record);

Map<String, Object> headers = message.getHeaders();
assertEquals("singleValue", headers.get("headerKey1"));
assertTrue(headers.get("headerKey2") instanceof List);
assertEquals(2, ((List<?>) headers.get("headerKey2")).size());
assertEquals(List.of("multiValue1", "multiValue2"), headers.get("headerKey2"));
}

private ConsumerRecord<Bytes, Bytes> record() {
return new ConsumerRecord<>("t", 1, 1L, Bytes.wrap("t".getBytes()), Bytes.wrap("t".getBytes()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.kafbat.ui.serdes;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;

import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.serde.api.Serde;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.junit.jupiter.api.Test;

class ProducerRecordCreatorTest {

@Test
void createWithHeaders() {
Serde.Serializer keySerializer = mock(Serde.Serializer.class);
Serde.Serializer valueSerializer = mock(Serde.Serializer.class);

ProducerRecordCreator recordCreator = new ProducerRecordCreator(keySerializer, valueSerializer);
Map<String, Object> headersMap = Map.of(
"headerKey1", "headerValue1",
"headerKey2", List.of("headerValue2", "headerValue3")
);
ProducerRecord<byte[], byte[]> record = recordCreator.create("topic", 1, "key", "value", headersMap);

assertNotNull(record.headers());
assertEquals(3, record.headers().toArray().length);
assertThat(record.headers()).containsExactlyInAnyOrder(
new RecordHeader("headerKey1", "headerValue1".getBytes()),
new RecordHeader("headerKey2", "headerValue2".getBytes()),
new RecordHeader("headerKey2", "headerValue3".getBytes())
);
}

@Test
void createWithInvalidHeaderValue() {
Serde.Serializer keySerializer = mock(Serde.Serializer.class);
Serde.Serializer valueSerializer = mock(Serde.Serializer.class);

ProducerRecordCreator recordCreator = new ProducerRecordCreator(keySerializer, valueSerializer);
Map<String, Object> invalidHeaders = Map.of("headerKey", Map.of("invalid", "value"));

assertThrows(ValidationException.class, () ->
recordCreator.create("topic", 1, "key", "value", invalidHeaders));
}
}
9 changes: 6 additions & 3 deletions contract/src/main/resources/swagger/kafbat-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2919,8 +2919,9 @@ components:
type: string
headers:
type: object
description: header values can be string or list of strings (for keys with multiple values)
additionalProperties:
type: string
type: object
partition:
type: integer
offset:
Expand Down Expand Up @@ -2948,8 +2949,9 @@ components:
nullable: true
headers:
type: object
description: header values can be string or list of strings (for keys with multiple values)
additionalProperties:
type: string
type: object
content:
type: string
nullable: true
Expand Down Expand Up @@ -3038,8 +3040,9 @@ components:
type: string
headers:
type: object
description: header values can be string or list of strings (for keys with multiple values)
additionalProperties:
type: string
type: object
content:
type: string
keyFormat:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ const InfoModal: React.FC<InfoModalProps> = ({ toggleIsOpen }) => {
</S.ListItem>
<S.ListItem>
<code>
record.headers.size() == 1 && !has(record.headers.k1) &&
record.headers[&apos;k2&apos;] == &apos;v2&apos;
</code>
&quot;v1&quot; in (type(record.headers.h1) == list ?
record.headers.h1 : [record.headers.h1])
</code> - in case a header can hold either a single value (string) or
multiple values (list of strings)
</S.ListItem>
</ol>
<Flexbox justifyContent="center" margin="20px 0 0 0">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Tab = 'key' | 'content' | 'headers';
export interface MessageContentProps {
messageKey?: string;
messageContent?: string;
headers?: { [key: string]: string | undefined };
headers?: { [key: string]: string | string[] | undefined };
timestamp?: Date;
timestampType?: TopicMessageTimestampTypeEnum;
keySize?: number;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const setupWrapper = (props?: Partial<MessageContentProps>) => {
<MessageContent
messageKey='"test-key"'
messageContent='{"data": "test"}'
headers={{ header: 'test' }}
headers={{ header1: 'test', header2: ['value1', 'value2'] }}
timestamp={new Date(0)}
timestampType={TopicMessageTimestampTypeEnum.CREATE_TIME}
keySerde="SchemaRegistry"
Expand Down

0 comments on commit 58b7833

Please sign in to comment.