Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix KafkaInputFormat with nested columns #13406

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,24 @@
import org.apache.druid.indexing.seekablestream.SettableByteEntity;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

public class KafkaInputReader implements InputEntityReader
{
private static final Logger log = new Logger(KafkaInputReader.class);

private final InputRowSchema inputRowSchema;
private final SettableByteEntity<KafkaRecordEntity> source;
private final Function<KafkaRecordEntity, KafkaHeaderReader> headerParserSupplier;
Expand Down Expand Up @@ -85,7 +85,60 @@ public KafkaInputReader(
this.timestampColumnName = timestampColumnName;
}

private List<String> getFinalDimensionList(HashSet<String> newDimensions)
@Override
public CloseableIterator<InputRow> read() throws IOException
{
final KafkaRecordEntity record = source.getEntity();
final Map<String, Object> mergedHeaderMap = new HashMap<>();
if (headerParserSupplier != null) {
KafkaHeaderReader headerParser = headerParserSupplier.apply(record);
List<Pair<String, Object>> headerList = headerParser.read();
for (Pair<String, Object> ele : headerList) {
mergedHeaderMap.put(ele.lhs, ele.rhs);
}
}

// Add kafka record timestamp to the mergelist, we will skip record timestamp if the same key exists already in
// the header list
mergedHeaderMap.putIfAbsent(timestampColumnName, record.getRecord().timestamp());

InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record);
if (keyParser != null) {
try (CloseableIterator<InputRow> keyIterator = keyParser.read()) {
// Key currently only takes the first row and ignores the rest.
if (keyIterator.hasNext()) {
// Return type for the key parser should be of type MapBasedInputRow
// Parsers returning other types are not compatible currently.
MapBasedInputRow keyRow = (MapBasedInputRow) keyIterator.next();
// Add the key to the mergeList only if the key string is not already present
mergedHeaderMap.putIfAbsent(
keyColumnName,
keyRow.getEvent().entrySet().stream().findFirst().get().getValue()
);
}
}
catch (ClassCastException e) {
throw new IOException(
"Unsupported keyFormat. KafkaInputformat only supports input format that return MapBasedInputRow rows"
);
}
}

// Ignore tombstone records that have null values.
if (record.getRecord().value() != null) {
return buildBlendedRows(valueParser, mergedHeaderMap);
} else {
return buildRowsWithoutValuePayload(mergedHeaderMap);
}
}

@Override
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
{
return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent()));
}

private List<String> getFinalDimensionList(Set<String> newDimensions)
{
final List<String> schemaDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames();
if (!schemaDimensions.isEmpty()) {
Expand All @@ -97,11 +150,14 @@ private List<String> getFinalDimensionList(HashSet<String> newDimensions)
}
}

private CloseableIterator<InputRow> buildBlendedRows(InputEntityReader valueParser, Map<String, Object> headerKeyList) throws IOException
private CloseableIterator<InputRow> buildBlendedRows(
InputEntityReader valueParser,
Map<String, Object> headerKeyList
) throws IOException
{
return valueParser.read().map(
r -> {
MapBasedInputRow valueRow;
final MapBasedInputRow valueRow;
try {
// Return type for the value parser should be of type MapBasedInputRow
// Parsers returning other types are not compatible currently.
Expand All @@ -113,14 +169,9 @@ private CloseableIterator<InputRow> buildBlendedRows(InputEntityReader valuePars
"Unsupported input format in valueFormat. KafkaInputFormat only supports input format that return MapBasedInputRow rows"
);
}
Map<String, Object> event = new HashMap<>(headerKeyList);
/* Currently we prefer payload attributes if there is a collision in names.
We can change this beahvior in later changes with a config knob. This default
behavior lets easy porting of existing inputFormats to the new one without any changes.
*/
event.putAll(valueRow.getEvent());

HashSet<String> newDimensions = new HashSet<String>(valueRow.getDimensions());

final Map<String, Object> event = buildBlendedEventMap(valueRow.getEvent(), headerKeyList);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that the ObjectFlattener map has a property in field-discovery mode where the entry-set/key-set doesn't return nested object keys but doing a get returns the nested k-v pair. Is that left as is intentionally?
Should we maybe let the nested objects get listed as well through the ObjectFlattener map so that they can get consumed by nested columns whenever possible?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is left like that intentionally for now because auto discovered dimensions are created with a string dimension indexer pointing at them today which is just going to end up calling java .toString on these values.

I am actually working on making it possible to use the nested column indexer for all auto discovered dimensions instead of the string dimension indexer we use today, which will be the appropriate time to add these columns to the list of discovered columns.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification - that makes sense to me.

final HashSet<String> newDimensions = new HashSet<>(valueRow.getDimensions());
newDimensions.addAll(headerKeyList.keySet());
// Remove the dummy timestamp added in KafkaInputFormat
newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING);
Expand All @@ -136,60 +187,70 @@ private CloseableIterator<InputRow> buildBlendedRows(InputEntityReader valuePars

private CloseableIterator<InputRow> buildRowsWithoutValuePayload(Map<String, Object> headerKeyList)
{
HashSet<String> newDimensions = new HashSet<String>(headerKeyList.keySet());
InputRow row = new MapBasedInputRow(
final InputRow row = new MapBasedInputRow(
inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList),
getFinalDimensionList(newDimensions),
getFinalDimensionList(headerKeyList.keySet()),
headerKeyList
);
List<InputRow> rows = Collections.singletonList(row);
final List<InputRow> rows = Collections.singletonList(row);
return CloseableIterators.withEmptyBaggage(rows.iterator());
}

@Override
public CloseableIterator<InputRow> read() throws IOException
/**
* Builds a map that blends two {@link Map}, presenting the combined keyset of both maps, and preferring to read
* from the first map and falling back to the second map if the value is not present.
*
* This strategy is used rather than just copying the values of the keyset into a new map so that any 'flattening'
* machinery (such as {@link Map} created by {@link org.apache.druid.java.util.common.parsers.ObjectFlatteners}) is
* still in place to be lazily evaluated instead of eagerly copying.
*/
private static Map<String, Object> buildBlendedEventMap(Map<String, Object> map, Map<String, Object> fallback)
{
KafkaRecordEntity record = source.getEntity();
Map<String, Object> mergeMap = new HashMap<>();
if (headerParserSupplier != null) {
KafkaHeaderReader headerParser = headerParserSupplier.apply(record);
List<Pair<String, Object>> headerList = headerParser.read();
for (Pair<String, Object> ele : headerList) {
mergeMap.put(ele.lhs, ele.rhs);
}
}

// Add kafka record timestamp to the mergelist, we will skip record timestamp if the same key exists already in the header list
mergeMap.putIfAbsent(timestampColumnName, record.getRecord().timestamp());
final Set<String> keySet = new HashSet<>(fallback.keySet());
keySet.addAll(map.keySet());

InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record);
if (keyParser != null) {
try (CloseableIterator<InputRow> keyIterator = keyParser.read()) {
// Key currently only takes the first row and ignores the rest.
if (keyIterator.hasNext()) {
// Return type for the key parser should be of type MapBasedInputRow
// Parsers returning other types are not compatible currently.
MapBasedInputRow keyRow = (MapBasedInputRow) keyIterator.next();
// Add the key to the mergeList only if the key string is not already present
mergeMap.putIfAbsent(keyColumnName, keyRow.getEvent().entrySet().stream().findFirst().get().getValue());
}
return new AbstractMap<String, Object>()
{
@Override
public Object get(Object key)
{
return map.getOrDefault((String) key, fallback.get(key));
}
catch (ClassCastException e) {
throw new IOException("Unsupported input format in keyFormat. KafkaInputformat only supports input format that return MapBasedInputRow rows");

@Override
public Set<String> keySet()
{
return keySet;
}
}

// Ignore tombstone records that have null values.
if (record.getRecord().value() != null) {
return buildBlendedRows(valueParser, mergeMap);
} else {
return buildRowsWithoutValuePayload(mergeMap);
}
}
@Override
public Set<Entry<String, Object>> entrySet()
{
return keySet().stream()
.map(
field -> new Entry<String, Object>()
{
@Override
public String getKey()
{
return field;
}

@Override
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
{
return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent()));
@Override
public Object getValue()
{
return get(field);
}

@Override
public Object setValue(final Object value)
{
throw new UnsupportedOperationException();
}
}
)
.collect(Collectors.toCollection(LinkedHashSet::new));
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
Expand Down Expand Up @@ -205,6 +205,7 @@ public void testWithHeaderKeyAndValue() throws IOException
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));

// Header verification
Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding")));
Expand Down Expand Up @@ -342,14 +343,14 @@ public byte[] value()
while (iterator.hasNext()) {

final InputRow row = iterator.next();
final MapBasedInputRow mrow = (MapBasedInputRow) row;
// Payload verifications
Assert.assertEquals(DateTimes.of("2021-06-24"), row.getTimestamp());
Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz")));
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));

// Header verification
Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding")));
Expand Down Expand Up @@ -444,6 +445,7 @@ public void testWithOutKeyAndHeaderSpecs() throws IOException
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));
numActualIterations++;
}

Expand Down Expand Up @@ -521,6 +523,7 @@ public void testWithMultipleMixedRecords() throws IOException
Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg")));
Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o"));
Assert.assertEquals(String.valueOf(i), Iterables.getOnlyElement(row.getDimension("index")));


Expand Down