Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.ByteBufferInputRowParser;
import org.apache.druid.data.input.InputRow;
import org.apache.avro.generic.GenericRecord;
import org.apache.druid.data.input.avro.AvroBytesDecoder;
import org.apache.druid.data.input.avro.AvroParsers;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.java.util.common.parsers.ObjectFlattener;

import javax.validation.constraints.NotNull;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
Expand All @@ -33,44 +35,103 @@
* This class is copied from druid source code
* in order to avoid adding additional dependencies on druid-indexing-service.
*/
public class AvroStreamInputRowParser implements ByteBufferInputRowParser {
public class AvroStreamInputRowParser implements ByteBufferInputRowParser
{
private final ParseSpec parseSpec;
private final Boolean binaryAsString;
private final Boolean extractUnionsByType;
private final AvroBytesDecoder avroBytesDecoder;
private final ObjectFlattener<GenericRecord> avroFlattener;
private final MapInputRowParser mapParser;

@JsonCreator public AvroStreamInputRowParser(@JsonProperty("parseSpec") ParseSpec parseSpec,
@JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder) {
@JsonCreator
public AvroStreamInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec,
@JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder,
@JsonProperty("binaryAsString") Boolean binaryAsString,
@JsonProperty("extractUnionsByType") Boolean extractUnionsByType
)
{
this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec");
this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder");
this.binaryAsString = binaryAsString != null && binaryAsString;
this.extractUnionsByType = extractUnionsByType != null && extractUnionsByType;
this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, this.binaryAsString, this.extractUnionsByType);
this.mapParser = new MapInputRowParser(parseSpec);
}

@NotNull @Override public List<InputRow> parseBatch(ByteBuffer input) {
throw new UnsupportedOperationException("This class is only used for JSON serde");
@Override
public List<InputRow> parseBatch(ByteBuffer input)
{
return AvroParsers.parseGenericRecord(avroBytesDecoder.parse(input), mapParser, avroFlattener);
}

@JsonProperty @Override public ParseSpec getParseSpec() {
@JsonProperty
@Override
public ParseSpec getParseSpec()
{
return parseSpec;
}

@JsonProperty public AvroBytesDecoder getAvroBytesDecoder() {
@JsonProperty
public AvroBytesDecoder getAvroBytesDecoder()
{
return avroBytesDecoder;
}

@Override public ByteBufferInputRowParser withParseSpec(ParseSpec parseSpec) {
return new AvroStreamInputRowParser(parseSpec, avroBytesDecoder);
@JsonProperty
public Boolean getBinaryAsString()
{
return binaryAsString;
}

@Override public boolean equals(final Object o) {
@JsonProperty
public Boolean isExtractUnionsByType()
{
return extractUnionsByType;
}

@Override
public ByteBufferInputRowParser withParseSpec(ParseSpec parseSpec)
{
return new AvroStreamInputRowParser(
parseSpec,
avroBytesDecoder,
binaryAsString,
extractUnionsByType
);
}

@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final AvroStreamInputRowParser that = (AvroStreamInputRowParser) o;
return Objects.equals(parseSpec, that.parseSpec) && Objects.equals(avroBytesDecoder, that.avroBytesDecoder);
return Objects.equals(parseSpec, that.parseSpec) &&
Objects.equals(avroBytesDecoder, that.avroBytesDecoder) &&
Objects.equals(binaryAsString, that.binaryAsString) &&
Objects.equals(extractUnionsByType, that.extractUnionsByType);
}

@Override
public int hashCode()
{
return Objects.hash(parseSpec, avroBytesDecoder, binaryAsString, extractUnionsByType);
}

@Override public int hashCode() {
return Objects.hash(parseSpec, avroBytesDecoder);
@Override
public String toString()
{
return "AvroStreamInputRowParser{" +
"parseSpec=" + parseSpec +
", binaryAsString=" + binaryAsString +
", extractUnionsByType=" + extractUnionsByType +
", avroBytesDecoder=" + avroBytesDecoder +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;

Expand All @@ -31,20 +33,66 @@
* This class is copied from druid source code
* in order to avoid adding additional dependencies on druid-indexing-service.
*/
public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig {
@JsonCreator
public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig
{
public KafkaIndexTaskTuningConfig(
@Nullable AppendableIndexSpec appendableIndexSpec,
@Nullable Integer maxRowsInMemory,
@Nullable Long maxBytesInMemory,
@Nullable Boolean skipBytesInMemoryOverheadCheck,
@Nullable Integer maxRowsPerSegment,
@Nullable Long maxTotalRows,
@Nullable Period intermediatePersistPeriod,
@Nullable File basePersistDirectory,
@Nullable Integer maxPendingPersists,
@Nullable IndexSpec indexSpec,
@Nullable IndexSpec indexSpecForIntermediatePersists,
@Nullable Boolean reportParseExceptions,
@Nullable Long handoffConditionTimeout,
@Nullable Boolean resetOffsetAutomatically,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@Nullable Period intermediateHandoffPeriod,
@Nullable Boolean logParseExceptions,
@Nullable Integer maxParseExceptions,
@Nullable Integer maxSavedParseExceptions
)
{
super(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
skipBytesInMemoryOverheadCheck,
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
basePersistDirectory,
maxPendingPersists,
indexSpec,
indexSpecForIntermediatePersists,
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
false,
segmentWriteOutMediumFactory,
intermediateHandoffPeriod,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions
);
}

@JsonCreator
private KafkaIndexTaskTuningConfig(
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
@JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
// This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
@Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout,
@JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically,
Expand All @@ -53,22 +101,23 @@ public KafkaIndexTaskTuningConfig(
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
) {
super(
)
{
this(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
skipBytesInMemoryOverheadCheck,
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
basePersistDirectory,
null,
maxPendingPersists,
indexSpec,
indexSpecForIntermediatePersists,
true,
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
false,
segmentWriteOutMediumFactory,
intermediateHandoffPeriod,
logParseExceptions,
Expand All @@ -78,18 +127,20 @@ public KafkaIndexTaskTuningConfig(
}

@Override
public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir) {
public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir)
{
return new KafkaIndexTaskTuningConfig(
getAppendableIndexSpec(),
getMaxRowsInMemory(),
getMaxBytesInMemory(),
isSkipBytesInMemoryOverheadCheck(),
getMaxRowsPerSegment(),
getMaxTotalRows(),
getIntermediatePersistPeriod(),
dir,
getMaxPendingPersists(),
getIndexSpec(),
getIndexSpecForIntermediatePersists(),
true,
isReportParseExceptions(),
getHandoffConditionTimeout(),
isResetOffsetAutomatically(),
Expand All @@ -103,26 +154,27 @@ public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir) {


@Override
public String toString() {
public String toString()
{
return "KafkaIndexTaskTuningConfig{" +
"maxRowsInMemory=" + getMaxRowsInMemory() +
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
", maxTotalRows=" + getMaxTotalRows() +
", maxBytesInMemory=" + getMaxBytesInMemory() +
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
", basePersistDirectory=" + getBasePersistDirectory() +
", maxPendingPersists=" + getMaxPendingPersists() +
", indexSpec=" + getIndexSpec() +
", indexSpecForIntermediatePersists=" + getIndexSpecForIntermediatePersists() +
", reportParseExceptions=" + isReportParseExceptions() +
", handoffConditionTimeout=" + getHandoffConditionTimeout() +
", resetOffsetAutomatically=" + isResetOffsetAutomatically() +
", segmentWriteOutMediumFactory=" + getSegmentWriteOutMediumFactory() +
", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
", logParseExceptions=" + isLogParseExceptions() +
", maxParseExceptions=" + getMaxParseExceptions() +
", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
'}';
"maxRowsInMemory=" + getMaxRowsInMemory() +
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
", maxTotalRows=" + getMaxTotalRows() +
", maxBytesInMemory=" + getMaxBytesInMemory() +
", skipBytesInMemoryOverheadCheck=" + isSkipBytesInMemoryOverheadCheck() +
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
", maxPendingPersists=" + getMaxPendingPersists() +
", indexSpec=" + getIndexSpec() +
", indexSpecForIntermediatePersists=" + getIndexSpecForIntermediatePersists() +
", reportParseExceptions=" + isReportParseExceptions() +
", handoffConditionTimeout=" + getHandoffConditionTimeout() +
", resetOffsetAutomatically=" + isResetOffsetAutomatically() +
", segmentWriteOutMediumFactory=" + getSegmentWriteOutMediumFactory() +
", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
", logParseExceptions=" + isLogParseExceptions() +
", maxParseExceptions=" + getMaxParseExceptions() +
", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
'}';
}

}
Loading
Loading