diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java index b689b63031a6..119fbbf4b574 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/AvroStreamInputRowParser.java @@ -20,11 +20,13 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import org.apache.druid.data.input.ByteBufferInputRowParser; -import org.apache.druid.data.input.InputRow; +import org.apache.avro.generic.GenericRecord; +import org.apache.druid.data.input.avro.AvroBytesDecoder; +import org.apache.druid.data.input.avro.AvroParsers; +import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.data.input.impl.ParseSpec; +import org.apache.druid.java.util.common.parsers.ObjectFlattener; -import javax.validation.constraints.NotNull; import java.nio.ByteBuffer; import java.util.List; import java.util.Objects; @@ -33,33 +35,76 @@ * This class is copied from druid source code * in order to avoid adding additional dependencies on druid-indexing-service. */ -public class AvroStreamInputRowParser implements ByteBufferInputRowParser { +public class AvroStreamInputRowParser implements ByteBufferInputRowParser +{ private final ParseSpec parseSpec; + private final Boolean binaryAsString; + private final Boolean extractUnionsByType; private final AvroBytesDecoder avroBytesDecoder; + private final ObjectFlattener avroFlattener; + private final MapInputRowParser mapParser; - @JsonCreator public AvroStreamInputRowParser(@JsonProperty("parseSpec") ParseSpec parseSpec, - @JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder) { + @JsonCreator + public AvroStreamInputRowParser( + @JsonProperty("parseSpec") ParseSpec parseSpec, + @JsonProperty("avroBytesDecoder") AvroBytesDecoder avroBytesDecoder, + @JsonProperty("binaryAsString") Boolean binaryAsString, + @JsonProperty("extractUnionsByType") Boolean extractUnionsByType + ) + { this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec"); this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder"); + this.binaryAsString = binaryAsString != null && binaryAsString; + this.extractUnionsByType = extractUnionsByType != null && extractUnionsByType; + this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, this.binaryAsString, this.extractUnionsByType); + this.mapParser = new MapInputRowParser(parseSpec); } - @NotNull @Override public List parseBatch(ByteBuffer input) { - throw new UnsupportedOperationException("This class is only used for JSON serde"); + @Override + public List parseBatch(ByteBuffer input) + { + return AvroParsers.parseGenericRecord(avroBytesDecoder.parse(input), mapParser, avroFlattener); } - @JsonProperty @Override public ParseSpec getParseSpec() { + @JsonProperty + @Override + public ParseSpec getParseSpec() + { return parseSpec; } - @JsonProperty public AvroBytesDecoder getAvroBytesDecoder() { + @JsonProperty + public AvroBytesDecoder getAvroBytesDecoder() + { return avroBytesDecoder; } - @Override public ByteBufferInputRowParser withParseSpec(ParseSpec parseSpec) { - return new AvroStreamInputRowParser(parseSpec, avroBytesDecoder); + @JsonProperty + public Boolean getBinaryAsString() + { + return binaryAsString; } - @Override public boolean equals(final Object o) { + @JsonProperty + public Boolean isExtractUnionsByType() + { + return extractUnionsByType; + } + + @Override + public ByteBufferInputRowParser withParseSpec(ParseSpec parseSpec) + { + return new AvroStreamInputRowParser( + parseSpec, + avroBytesDecoder, + binaryAsString, + extractUnionsByType + ); + } + + @Override + public boolean equals(final Object o) + { if (this == o) { return true; } @@ -67,10 +112,26 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser { return false; } final AvroStreamInputRowParser that = (AvroStreamInputRowParser) o; - return Objects.equals(parseSpec, that.parseSpec) && Objects.equals(avroBytesDecoder, that.avroBytesDecoder); + return Objects.equals(parseSpec, that.parseSpec) && + Objects.equals(avroBytesDecoder, that.avroBytesDecoder) && + Objects.equals(binaryAsString, that.binaryAsString) && + Objects.equals(extractUnionsByType, that.extractUnionsByType); + } + + @Override + public int hashCode() + { + return Objects.hash(parseSpec, avroBytesDecoder, binaryAsString, extractUnionsByType); } - @Override public int hashCode() { - return Objects.hash(parseSpec, avroBytesDecoder); + @Override + public String toString() + { + return "AvroStreamInputRowParser{" + + "parseSpec=" + parseSpec + + ", binaryAsString=" + binaryAsString + + ", extractUnionsByType=" + extractUnionsByType + + ", avroBytesDecoder=" + avroBytesDecoder + + '}'; } } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaIndexTaskTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaIndexTaskTuningConfig.java index 92800cef5fd9..75a52d8e2b5e 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaIndexTaskTuningConfig.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaIndexTaskTuningConfig.java @@ -20,7 +20,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.joda.time.Period; @@ -31,20 +33,66 @@ * This class is copied from druid source code * in order to avoid adding additional dependencies on druid-indexing-service. */ -public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig { - @JsonCreator +public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig +{ public KafkaIndexTaskTuningConfig( + @Nullable AppendableIndexSpec appendableIndexSpec, + @Nullable Integer maxRowsInMemory, + @Nullable Long maxBytesInMemory, + @Nullable Boolean skipBytesInMemoryOverheadCheck, + @Nullable Integer maxRowsPerSegment, + @Nullable Long maxTotalRows, + @Nullable Period intermediatePersistPeriod, + @Nullable File basePersistDirectory, + @Nullable Integer maxPendingPersists, + @Nullable IndexSpec indexSpec, + @Nullable IndexSpec indexSpecForIntermediatePersists, + @Nullable Boolean reportParseExceptions, + @Nullable Long handoffConditionTimeout, + @Nullable Boolean resetOffsetAutomatically, + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + @Nullable Period intermediateHandoffPeriod, + @Nullable Boolean logParseExceptions, + @Nullable Integer maxParseExceptions, + @Nullable Integer maxSavedParseExceptions + ) + { + super( + appendableIndexSpec, + maxRowsInMemory, + maxBytesInMemory, + skipBytesInMemoryOverheadCheck, + maxRowsPerSegment, + maxTotalRows, + intermediatePersistPeriod, + basePersistDirectory, + maxPendingPersists, + indexSpec, + indexSpecForIntermediatePersists, + reportParseExceptions, + handoffConditionTimeout, + resetOffsetAutomatically, + false, + segmentWriteOutMediumFactory, + intermediateHandoffPeriod, + logParseExceptions, + maxParseExceptions, + maxSavedParseExceptions + ); + } + + @JsonCreator + private KafkaIndexTaskTuningConfig( + @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec, @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, + @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck, @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod, - @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, - // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. - @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout, @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically, @@ -53,22 +101,23 @@ public KafkaIndexTaskTuningConfig( @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions - ) { - super( + ) + { + this( + appendableIndexSpec, maxRowsInMemory, maxBytesInMemory, + skipBytesInMemoryOverheadCheck, maxRowsPerSegment, maxTotalRows, intermediatePersistPeriod, - basePersistDirectory, + null, maxPendingPersists, indexSpec, indexSpecForIntermediatePersists, - true, reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, - false, segmentWriteOutMediumFactory, intermediateHandoffPeriod, logParseExceptions, @@ -78,10 +127,13 @@ public KafkaIndexTaskTuningConfig( } @Override - public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir) { + public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir) + { return new KafkaIndexTaskTuningConfig( + getAppendableIndexSpec(), getMaxRowsInMemory(), getMaxBytesInMemory(), + isSkipBytesInMemoryOverheadCheck(), getMaxRowsPerSegment(), getMaxTotalRows(), getIntermediatePersistPeriod(), @@ -89,7 +141,6 @@ public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir) { getMaxPendingPersists(), getIndexSpec(), getIndexSpecForIntermediatePersists(), - true, isReportParseExceptions(), getHandoffConditionTimeout(), isResetOffsetAutomatically(), @@ -103,26 +154,27 @@ public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir) { @Override - public String toString() { + public String toString() + { return "KafkaIndexTaskTuningConfig{" + - "maxRowsInMemory=" + getMaxRowsInMemory() + - ", maxRowsPerSegment=" + getMaxRowsPerSegment() + - ", maxTotalRows=" + getMaxTotalRows() + - ", maxBytesInMemory=" + getMaxBytesInMemory() + - ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() + - ", basePersistDirectory=" + getBasePersistDirectory() + - ", maxPendingPersists=" + getMaxPendingPersists() + - ", indexSpec=" + getIndexSpec() + - ", indexSpecForIntermediatePersists=" + getIndexSpecForIntermediatePersists() + - ", reportParseExceptions=" + isReportParseExceptions() + - ", handoffConditionTimeout=" + getHandoffConditionTimeout() + - ", resetOffsetAutomatically=" + isResetOffsetAutomatically() + - ", segmentWriteOutMediumFactory=" + getSegmentWriteOutMediumFactory() + - ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + - ", logParseExceptions=" + isLogParseExceptions() + - ", maxParseExceptions=" + getMaxParseExceptions() + - ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + - '}'; + "maxRowsInMemory=" + getMaxRowsInMemory() + + ", maxRowsPerSegment=" + getMaxRowsPerSegment() + + ", maxTotalRows=" + getMaxTotalRows() + + ", maxBytesInMemory=" + getMaxBytesInMemory() + + ", skipBytesInMemoryOverheadCheck=" + isSkipBytesInMemoryOverheadCheck() + + ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() + + ", maxPendingPersists=" + getMaxPendingPersists() + + ", indexSpec=" + getIndexSpec() + + ", indexSpecForIntermediatePersists=" + getIndexSpecForIntermediatePersists() + + ", reportParseExceptions=" + isReportParseExceptions() + + ", handoffConditionTimeout=" + getHandoffConditionTimeout() + + ", resetOffsetAutomatically=" + isResetOffsetAutomatically() + + ", segmentWriteOutMediumFactory=" + getSegmentWriteOutMediumFactory() + + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + + ", logParseExceptions=" + isLogParseExceptions() + + ", maxParseExceptions=" + getMaxParseExceptions() + + ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + + '}'; } } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorIOConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorIOConfig.java index 9f5fc48e6e6d..f2d541964043 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorIOConfig.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorIOConfig.java @@ -19,220 +19,134 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides; +import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; import org.apache.druid.java.util.common.StringUtils; -import org.joda.time.Duration; +import org.joda.time.DateTime; import org.joda.time.Period; +import javax.annotation.Nullable; import java.util.Map; /** * This class is copied from druid source code * in order to avoid adding additional dependencies on druid-indexing-service. */ -public class KafkaSupervisorIOConfig { +public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig +{ + public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider"; public static final String BOOTSTRAP_SERVERS_KEY = "bootstrap.servers"; - - private final String topic; - private final Integer replicas; - private final Integer taskCount; - private final Duration taskDuration; - private final Map consumerProperties; - private final Duration startDelay; - private final Duration period; - private final boolean useEarliestOffset; - private final Duration completionTimeout; - @SuppressWarnings({ "OptionalUsedAsFieldOrParameterType", "Guava" }) private final Optional - lateMessageRejectionPeriod; - @SuppressWarnings({ "OptionalUsedAsFieldOrParameterType", "Guava" }) private final Optional - earlyMessageRejectionPeriod; - private final boolean skipOffsetGaps; - - @JsonCreator public KafkaSupervisorIOConfig(@JsonProperty("topic") String topic, + public static final String TRUST_STORE_PASSWORD_KEY = "ssl.truststore.password"; + public static final String KEY_STORE_PASSWORD_KEY = "ssl.keystore.password"; + public static final String KEY_PASSWORD_KEY = "ssl.key.password"; + public static final long DEFAULT_POLL_TIMEOUT_MILLIS = 100; + + private final Map consumerProperties; + private final long pollTimeout; + private final KafkaConfigOverrides configOverrides; + + @JsonCreator + public KafkaSupervisorIOConfig( + @JsonProperty("topic") String topic, + @JsonProperty("inputFormat") InputFormat inputFormat, @JsonProperty("replicas") Integer replicas, @JsonProperty("taskCount") Integer taskCount, @JsonProperty("taskDuration") Period taskDuration, - @JsonProperty("consumerProperties") Map consumerProperties, + @JsonProperty("consumerProperties") Map consumerProperties, + @Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig, + @JsonProperty("pollTimeout") Long pollTimeout, @JsonProperty("startDelay") Period startDelay, @JsonProperty("period") Period period, @JsonProperty("useEarliestOffset") Boolean useEarliestOffset, @JsonProperty("completionTimeout") Period completionTimeout, @JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod, @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod, - @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps) { - this.topic = Preconditions.checkNotNull(topic, "topic"); - this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); - Preconditions.checkNotNull(consumerProperties.get(BOOTSTRAP_SERVERS_KEY), - StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY)); - - this.replicas = replicas != null ? replicas : 1; - this.taskCount = taskCount != null ? taskCount : 1; - this.taskDuration = defaultDuration(taskDuration, "PT1H"); - this.startDelay = defaultDuration(startDelay, "PT5S"); - this.period = defaultDuration(period, "PT30S"); - this.useEarliestOffset = useEarliestOffset != null ? useEarliestOffset : false; - this.completionTimeout = defaultDuration(completionTimeout, "PT30M"); - //noinspection Guava - this.lateMessageRejectionPeriod = - lateMessageRejectionPeriod == null ? - Optional.absent() : - Optional.of(lateMessageRejectionPeriod.toStandardDuration()); - //noinspection Guava - this.earlyMessageRejectionPeriod = - earlyMessageRejectionPeriod == null ? - Optional.absent() : - Optional.of(earlyMessageRejectionPeriod.toStandardDuration()); - this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : false; - } - - @JsonProperty public String getTopic() { - return topic; - } + @JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime, + @JsonProperty("configOverrides") KafkaConfigOverrides configOverrides, + @JsonProperty("idleConfig") IdleConfig idleConfig + ) + { + super( + Preconditions.checkNotNull(topic, "topic"), + inputFormat, + replicas, + taskCount, + taskDuration, + startDelay, + period, + useEarliestOffset, + completionTimeout, + lateMessageRejectionPeriod, + earlyMessageRejectionPeriod, + autoScalerConfig, + lateMessageRejectionStartDateTime, + idleConfig + ); - @JsonProperty public Integer getReplicas() { - return replicas; - } - - @JsonProperty public Integer getTaskCount() { - return taskCount; + this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); + Preconditions.checkNotNull( + consumerProperties.get(BOOTSTRAP_SERVERS_KEY), + StringUtils.format("consumerProperties must contain entry for [%s]", BOOTSTRAP_SERVERS_KEY) + ); + this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS; + this.configOverrides = configOverrides; } - @JsonProperty public Duration getTaskDuration() { - return taskDuration; + @JsonProperty + public String getTopic() + { + return getStream(); } - @JsonProperty public Map getConsumerProperties() { + @JsonProperty + public Map getConsumerProperties() + { return consumerProperties; } - @JsonProperty public Duration getStartDelay() { - return startDelay; + @JsonProperty + public long getPollTimeout() + { + return pollTimeout; } - @JsonProperty public Duration getPeriod() { - return period; + @JsonProperty + public boolean isUseEarliestOffset() + { + return isUseEarliestSequenceNumber(); } - @JsonProperty public boolean isUseEarliestOffset() { - return useEarliestOffset; + @JsonProperty + public KafkaConfigOverrides getConfigOverrides() + { + return configOverrides; } - @JsonProperty public Duration getCompletionTimeout() { - return completionTimeout; + @Override + public String toString() + { + return "KafkaSupervisorIOConfig{" + + "topic='" + getTopic() + '\'' + + ", replicas=" + getReplicas() + + ", taskCount=" + getTaskCount() + + ", taskDuration=" + getTaskDuration() + + ", consumerProperties=" + consumerProperties + + ", autoScalerConfig=" + getAutoScalerConfig() + + ", pollTimeout=" + pollTimeout + + ", startDelay=" + getStartDelay() + + ", period=" + getPeriod() + + ", useEarliestOffset=" + isUseEarliestOffset() + + ", completionTimeout=" + getCompletionTimeout() + + ", earlyMessageRejectionPeriod=" + getEarlyMessageRejectionPeriod() + + ", lateMessageRejectionPeriod=" + getLateMessageRejectionPeriod() + + ", lateMessageRejectionStartDateTime=" + getLateMessageRejectionStartDateTime() + + ", configOverrides=" + getConfigOverrides() + + ", idleConfig=" + getIdleConfig() + + '}'; } - @SuppressWarnings("Guava") @JsonProperty public Optional getEarlyMessageRejectionPeriod() { - return earlyMessageRejectionPeriod; - } - - @SuppressWarnings("Guava") @JsonProperty public Optional getLateMessageRejectionPeriod() { - return lateMessageRejectionPeriod; - } - - @JsonProperty public boolean isSkipOffsetGaps() { - return skipOffsetGaps; - } - - @Override public String toString() { - return "KafkaSupervisorIOConfig{" - + "topic='" - + topic - + '\'' - + ", replicas=" - + replicas - + ", taskCount=" - + taskCount - + ", taskDuration=" - + taskDuration - + ", consumerProperties=" - + consumerProperties - + ", startDelay=" - + startDelay - + ", period=" - + period - + ", useEarliestOffset=" - + useEarliestOffset - + ", completionTimeout=" - + completionTimeout - + ", lateMessageRejectionPeriod=" - + lateMessageRejectionPeriod - + ", skipOffsetGaps=" - + skipOffsetGaps - + '}'; - } - - private static Duration defaultDuration(final Period period, final String theDefault) { - return (period == null ? new Period(theDefault) : period).toStandardDuration(); - } - - @Override public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - - KafkaSupervisorIOConfig that = (KafkaSupervisorIOConfig) o; - - if (useEarliestOffset != that.useEarliestOffset) { - return false; - } - if (skipOffsetGaps != that.skipOffsetGaps) { - return false; - } - if (topic != null ? !topic.equals(that.topic) : that.topic != null) { - return false; - } - if (replicas != null ? !replicas.equals(that.replicas) : that.replicas != null) { - return false; - } - if (taskCount != null ? !taskCount.equals(that.taskCount) : that.taskCount != null) { - return false; - } - if (taskDuration != null ? !taskDuration.equals(that.taskDuration) : that.taskDuration != null) { - return false; - } - if (consumerProperties != null ? - !consumerProperties.equals(that.consumerProperties) : - that.consumerProperties != null) { - return false; - } - if (startDelay != null ? !startDelay.equals(that.startDelay) : that.startDelay != null) { - return false; - } - if (period != null ? !period.equals(that.period) : that.period != null) { - return false; - } - if (completionTimeout != null ? - !completionTimeout.equals(that.completionTimeout) : - that.completionTimeout != null) { - return false; - } - if (lateMessageRejectionPeriod.isPresent() ? - !lateMessageRejectionPeriod.equals(that.lateMessageRejectionPeriod) : - that.lateMessageRejectionPeriod.isPresent()) { - return false; - } - return earlyMessageRejectionPeriod.isPresent() ? - earlyMessageRejectionPeriod.equals(that.earlyMessageRejectionPeriod) : - !that.earlyMessageRejectionPeriod.isPresent(); - } - - @Override public int hashCode() { - int result = topic != null ? topic.hashCode() : 0; - result = 31 * result + (replicas != null ? replicas.hashCode() : 0); - result = 31 * result + (taskCount != null ? taskCount.hashCode() : 0); - result = 31 * result + (taskDuration != null ? taskDuration.hashCode() : 0); - result = 31 * result + (consumerProperties != null ? consumerProperties.hashCode() : 0); - result = 31 * result + (startDelay != null ? startDelay.hashCode() : 0); - result = 31 * result + (period != null ? period.hashCode() : 0); - result = 31 * result + (useEarliestOffset ? 1 : 0); - result = 31 * result + (completionTimeout != null ? completionTimeout.hashCode() : 0); - result = 31 * result + (lateMessageRejectionPeriod.isPresent() ? lateMessageRejectionPeriod.hashCode() : 0); - result = 31 * result + (earlyMessageRejectionPeriod.isPresent() ? earlyMessageRejectionPeriod.hashCode() : 0); - result = 31 * result + (skipOffsetGaps ? 1 : 0); - return result; - } } diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReport.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReport.java deleted file mode 100644 index bca1c7896848..000000000000 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReport.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.druid.json; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Lists; -import org.apache.druid.indexing.overlord.supervisor.SupervisorReport; -import org.joda.time.DateTime; - -import javax.annotation.Nullable; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -/** - * This class is copied from druid source code - * in order to avoid adding additional dependencies on druid-indexing-service. - */ -public class KafkaSupervisorReport extends SupervisorReport { - /** - * Report Payload class. - */ - public static class KafkaSupervisorReportPayload { - private final String dataSource; - private final String topic; - private final Integer partitions; - private final Integer replicas; - private final Long durationSeconds; - private final List activeTasks; - private final List publishingTasks; - private final Map latestOffsets; - private final Map minimumLag; - private final Long aggregateLag; - private final DateTime offsetsLastUpdated; - - @JsonCreator public KafkaSupervisorReportPayload(@JsonProperty("dataSource") String dataSource, - @JsonProperty("topic") String topic, - @JsonProperty("partitions") Integer partitions, - @JsonProperty("replicas") Integer replicas, - @JsonProperty("durationSeconds") Long durationSeconds, - @Nullable @JsonProperty("latestOffsets") Map latestOffsets, - @Nullable @JsonProperty("minimumLag") Map minimumLag, - @Nullable @JsonProperty("aggregateLag") Long aggregateLag, - @Nullable @JsonProperty("offsetsLastUpdated") DateTime offsetsLastUpdated) { - this.dataSource = dataSource; - this.topic = topic; - this.partitions = partitions; - this.replicas = replicas; - this.durationSeconds = durationSeconds; - this.activeTasks = Lists.newArrayList(); - this.publishingTasks = Lists.newArrayList(); - this.latestOffsets = latestOffsets; - this.minimumLag = minimumLag; - this.aggregateLag = aggregateLag; - this.offsetsLastUpdated = offsetsLastUpdated; - } - - @JsonProperty public String getDataSource() { - return dataSource; - } - - @JsonProperty public String getTopic() { - return topic; - } - - @JsonProperty public Integer getPartitions() { - return partitions; - } - - @JsonProperty public Integer getReplicas() { - return replicas; - } - - @JsonProperty public Long getDurationSeconds() { - return durationSeconds; - } - - @JsonProperty public List getActiveTasks() { - return activeTasks; - } - - @JsonProperty public List getPublishingTasks() { - return publishingTasks; - } - - @JsonProperty @JsonInclude(JsonInclude.Include.NON_NULL) public Map getLatestOffsets() { - return latestOffsets; - } - - @JsonProperty @JsonInclude(JsonInclude.Include.NON_NULL) public Map getMinimumLag() { - return minimumLag; - } - - @JsonProperty @JsonInclude(JsonInclude.Include.NON_NULL) public Long getAggregateLag() { - return aggregateLag; - } - - @JsonProperty public DateTime getOffsetsLastUpdated() { - return offsetsLastUpdated; - } - - @Override public String toString() { - return "{" - + "dataSource='" - + dataSource - + '\'' - + ", topic='" - + topic - + '\'' - + ", partitions=" - + partitions - + ", replicas=" - + replicas - + ", durationSeconds=" - + durationSeconds - + ", active=" - + activeTasks - + ", publishing=" - + publishingTasks - + (latestOffsets != null ? ", latestOffsets=" + latestOffsets : "") - + (minimumLag != null ? ", minimumLag=" + minimumLag : "") - + (aggregateLag != null ? ", aggregateLag=" + aggregateLag : "") - + (offsetsLastUpdated != null ? ", offsetsLastUpdated=" + offsetsLastUpdated : "") - + '}'; - } - } - - private final KafkaSupervisorReportPayload payload; - - @JsonCreator public KafkaSupervisorReport(@JsonProperty("id") String id, - @JsonProperty("generationTime") DateTime generationTime, - @JsonProperty("payload") KafkaSupervisorReportPayload payload) { - super(id, generationTime, payload); - this.payload = payload; - } - - public KafkaSupervisorReport(String dataSource, - DateTime generationTime, - String topic, - Integer partitions, - Integer replicas, - Long durationSeconds, - @Nullable Map latestOffsets, - @Nullable Map minimumLag, - @Nullable Long aggregateLag, - @Nullable DateTime offsetsLastUpdated) { - this(dataSource, - generationTime, - new KafkaSupervisorReportPayload(dataSource, - topic, - partitions, - replicas, - durationSeconds, - latestOffsets, - minimumLag, - aggregateLag, - offsetsLastUpdated)); - } - - @Override public KafkaSupervisorReportPayload getPayload() { - return payload; - } - - @Override public String toString() { - return "{" + "id='" + getId() + '\'' + ", generationTime=" + getGenerationTime() + ", payload=" + payload + '}'; - } - - @Override public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - KafkaSupervisorReport that = (KafkaSupervisorReport) o; - return Objects.equals(payload, that.payload); - } - - @Override public int hashCode() { - return Objects.hash(super.hashCode(), payload); - } -} diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReportPayload.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReportPayload.java new file mode 100644 index 000000000000..c7fbb5ea1735 --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/KafkaSupervisorReportPayload.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid.json; + +import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Map; + +/** + * This class is copied from druid source code + * in order to avoid adding additional dependencies on druid-indexing-service. + */ +public class KafkaSupervisorReportPayload extends SeekableStreamSupervisorReportPayload +{ + public KafkaSupervisorReportPayload( + String dataSource, + String topic, + int partitions, + int replicas, + long durationSeconds, + @Nullable Map latestOffsets, + @Nullable Map minimumLag, + @Nullable Long aggregateLag, + @Nullable DateTime offsetsLastUpdated, + boolean suspended, + boolean healthy, + SupervisorStateManager.State state, + SupervisorStateManager.State detailedState, + List recentErrors + ) + { + super( + dataSource, + topic, + partitions, + replicas, + durationSeconds, + latestOffsets, + minimumLag, + aggregateLag, + null, + null, + offsetsLastUpdated, + suspended, + healthy, + state, + detailedState, + recentErrors + ); + } + + @Override + public String toString() + { + return "KafkaSupervisorReportPayload{" + + "dataSource='" + getDataSource() + '\'' + + ", topic='" + getStream() + '\'' + + ", partitions=" + getPartitions() + + ", replicas=" + getReplicas() + + ", durationSeconds=" + getDurationSeconds() + + ", active=" + getActiveTasks() + + ", publishing=" + getPublishingTasks() + + (getLatestOffsets() != null ? ", latestOffsets=" + getLatestOffsets() : "") + + (getMinimumLag() != null ? ", minimumLag=" + getMinimumLag() : "") + + (getAggregateLag() != null ? ", aggregateLag=" + getAggregateLag() : "") + + (getOffsetsLastUpdated() != null ? ", sequenceLastUpdated=" + getOffsetsLastUpdated() : "") + + ", suspended=" + isSuspended() + + ", healthy=" + isHealthy() + + ", state=" + getState() + + ", detailedState=" + getDetailedState() + + ", recentErrors=" + getRecentErrors() + + '}'; + } +} \ No newline at end of file diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamIndexTaskTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamIndexTaskTuningConfig.java index 289f0e8d4308..9b0e5eeb8cc5 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamIndexTaskTuningConfig.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamIndexTaskTuningConfig.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig; @@ -35,12 +36,15 @@ * This class is copied from druid source code * in order to avoid adding additional dependencies on druid-indexing-service. */ -public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfig, AppenderatorConfig { +public abstract class SeekableStreamIndexTaskTuningConfig implements AppenderatorConfig +{ private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false; private static final boolean DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK = false; + private final AppendableIndexSpec appendableIndexSpec; private final int maxRowsInMemory; private final long maxBytesInMemory; + private final boolean skipBytesInMemoryOverheadCheck; private final DynamicPartitionsSpec partitionsSpec; private final Period intermediatePersistPeriod; private final File basePersistDirectory; @@ -61,8 +65,10 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi private final int maxSavedParseExceptions; public SeekableStreamIndexTaskTuningConfig( + @Nullable AppendableIndexSpec appendableIndexSpec, @Nullable Integer maxRowsInMemory, @Nullable Long maxBytesInMemory, + @Nullable Boolean skipBytesInMemoryOverheadCheck, @Nullable Integer maxRowsPerSegment, @Nullable Long maxTotalRows, @Nullable Period intermediatePersistPeriod, @@ -70,8 +76,6 @@ public SeekableStreamIndexTaskTuningConfig( @Nullable Integer maxPendingPersists, @Nullable IndexSpec indexSpec, @Nullable IndexSpec indexSpecForIntermediatePersists, - // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. - @Deprecated @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @Deprecated @Nullable Boolean reportParseExceptions, @Nullable Long handoffConditionTimeout, @Nullable Boolean resetOffsetAutomatically, @@ -81,171 +85,198 @@ public SeekableStreamIndexTaskTuningConfig( @Nullable Boolean logParseExceptions, @Nullable Integer maxParseExceptions, @Nullable Integer maxSavedParseExceptions - ) { + ) + { // Cannot be a static because default basePersistDirectory is unique per-instance final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory); + this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec; this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory; this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows); // initializing this to 0, it will be lazily initialized to a value - // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) + // @see #getMaxBytesInMemoryOrDefault() this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; + this.skipBytesInMemoryOverheadCheck = skipBytesInMemoryOverheadCheck == null ? + DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK : skipBytesInMemoryOverheadCheck; this.intermediatePersistPeriod = intermediatePersistPeriod == null - ? defaults.getIntermediatePersistPeriod() - : intermediatePersistPeriod; - this.basePersistDirectory = defaults.getBasePersistDirectory(); + ? defaults.getIntermediatePersistPeriod() + : intermediatePersistPeriod; + this.basePersistDirectory = basePersistDirectory; this.maxPendingPersists = maxPendingPersists == null ? 0 : maxPendingPersists; this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec; this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? - this.indexSpec : indexSpecForIntermediatePersists; + this.indexSpec : indexSpecForIntermediatePersists; this.reportParseExceptions = reportParseExceptions == null - ? defaults.isReportParseExceptions() - : reportParseExceptions; + ? defaults.isReportParseExceptions() + : reportParseExceptions; this.handoffConditionTimeout = handoffConditionTimeout == null - ? defaults.getHandoffConditionTimeout() - : handoffConditionTimeout; + ? defaults.getHandoffConditionTimeout() + : handoffConditionTimeout; this.resetOffsetAutomatically = resetOffsetAutomatically == null - ? DEFAULT_RESET_OFFSET_AUTOMATICALLY - : resetOffsetAutomatically; + ? DEFAULT_RESET_OFFSET_AUTOMATICALLY + : resetOffsetAutomatically; this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory; this.intermediateHandoffPeriod = intermediateHandoffPeriod == null - ? new Period().withDays(Integer.MAX_VALUE) - : intermediateHandoffPeriod; + ? new Period().withDays(Integer.MAX_VALUE) + : intermediateHandoffPeriod; this.skipSequenceNumberAvailabilityCheck = skipSequenceNumberAvailabilityCheck == null - ? DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK - : skipSequenceNumberAvailabilityCheck; + ? DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK + : skipSequenceNumberAvailabilityCheck; if (this.reportParseExceptions) { this.maxParseExceptions = 0; this.maxSavedParseExceptions = maxSavedParseExceptions == null ? 0 : Math.min(1, maxSavedParseExceptions); } else { this.maxParseExceptions = maxParseExceptions == null - ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS - : maxParseExceptions; + ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS + : maxParseExceptions; this.maxSavedParseExceptions = maxSavedParseExceptions == null - ? TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS - : maxSavedParseExceptions; + ? TuningConfig.DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS + : maxSavedParseExceptions; } this.logParseExceptions = logParseExceptions == null - ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS - : logParseExceptions; + ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS + : logParseExceptions; } @Override @JsonProperty - public int getMaxRowsInMemory() { + public AppendableIndexSpec getAppendableIndexSpec() + { + return appendableIndexSpec; + } + + @Override + @JsonProperty + public int getMaxRowsInMemory() + { return maxRowsInMemory; } @Override @JsonProperty - public long getMaxBytesInMemory() { + public long getMaxBytesInMemory() + { return maxBytesInMemory; } + @JsonProperty + @Override + public boolean isSkipBytesInMemoryOverheadCheck() + { + return skipBytesInMemoryOverheadCheck; + } + @Override @JsonProperty - public Integer getMaxRowsPerSegment() { + public Integer getMaxRowsPerSegment() + { return partitionsSpec.getMaxRowsPerSegment(); } @JsonProperty @Override @Nullable - public Long getMaxTotalRows() { + public Long getMaxTotalRows() + { return partitionsSpec.getMaxTotalRows(); } @Override - public DynamicPartitionsSpec getPartitionsSpec() { + public DynamicPartitionsSpec getPartitionsSpec() + { return partitionsSpec; } @Override @JsonProperty - public Period getIntermediatePersistPeriod() { + public Period getIntermediatePersistPeriod() + { return intermediatePersistPeriod; } @Override - @JsonProperty - public File getBasePersistDirectory() { + public File getBasePersistDirectory() + { return basePersistDirectory; } @Override @JsonProperty @Deprecated - public int getMaxPendingPersists() { + public int getMaxPendingPersists() + { return maxPendingPersists; } @Override @JsonProperty - public IndexSpec getIndexSpec() { + public IndexSpec getIndexSpec() + { return indexSpec; } @JsonProperty @Override - public IndexSpec getIndexSpecForIntermediatePersists() { + public IndexSpec getIndexSpecForIntermediatePersists() + { return indexSpecForIntermediatePersists; } - /** - * Always returns true, doesn't affect the version being built. - */ - @Deprecated - @JsonProperty - public boolean getBuildV9Directly() { - return true; - } - @Override @JsonProperty - public boolean isReportParseExceptions() { + public boolean isReportParseExceptions() + { return reportParseExceptions; } @JsonProperty - public long getHandoffConditionTimeout() { + public long getHandoffConditionTimeout() + { return handoffConditionTimeout; } @JsonProperty - public boolean isResetOffsetAutomatically() { + public boolean isResetOffsetAutomatically() + { return resetOffsetAutomatically; } @Override @JsonProperty @Nullable - public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() { + public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() + { return segmentWriteOutMediumFactory; } @JsonProperty - public Period getIntermediateHandoffPeriod() { + public Period getIntermediateHandoffPeriod() + { return intermediateHandoffPeriod; } @JsonProperty - public boolean isLogParseExceptions() { + public boolean isLogParseExceptions() + { return logParseExceptions; } @JsonProperty - public int getMaxParseExceptions() { + public int getMaxParseExceptions() + { return maxParseExceptions; } @JsonProperty - public int getMaxSavedParseExceptions() { + public int getMaxSavedParseExceptions() + { return maxSavedParseExceptions; } @JsonProperty - public boolean isSkipSequenceNumberAvailabilityCheck() { + public boolean isSkipSequenceNumberAvailabilityCheck() + { return skipSequenceNumberAvailabilityCheck; } @@ -253,7 +284,8 @@ public boolean isSkipSequenceNumberAvailabilityCheck() { public abstract SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir); @Override - public boolean equals(Object o) { + public boolean equals(Object o) + { if (this == o) { return true; } @@ -261,30 +293,35 @@ public boolean equals(Object o) { return false; } SeekableStreamIndexTaskTuningConfig that = (SeekableStreamIndexTaskTuningConfig) o; - return maxRowsInMemory == that.maxRowsInMemory && - maxBytesInMemory == that.maxBytesInMemory && - maxPendingPersists == that.maxPendingPersists && - reportParseExceptions == that.reportParseExceptions && - handoffConditionTimeout == that.handoffConditionTimeout && - resetOffsetAutomatically == that.resetOffsetAutomatically && - skipSequenceNumberAvailabilityCheck == that.skipSequenceNumberAvailabilityCheck && - logParseExceptions == that.logParseExceptions && - maxParseExceptions == that.maxParseExceptions && - maxSavedParseExceptions == that.maxSavedParseExceptions && - Objects.equals(partitionsSpec, that.partitionsSpec) && - Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) && - Objects.equals(basePersistDirectory, that.basePersistDirectory) && - Objects.equals(indexSpec, that.indexSpec) && - Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && - Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) && - Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod); + return Objects.equals(appendableIndexSpec, that.appendableIndexSpec) && + maxRowsInMemory == that.maxRowsInMemory && + maxBytesInMemory == that.maxBytesInMemory && + skipBytesInMemoryOverheadCheck == that.skipBytesInMemoryOverheadCheck && + maxPendingPersists == that.maxPendingPersists && + reportParseExceptions == that.reportParseExceptions && + handoffConditionTimeout == that.handoffConditionTimeout && + resetOffsetAutomatically == that.resetOffsetAutomatically && + skipSequenceNumberAvailabilityCheck == that.skipSequenceNumberAvailabilityCheck && + logParseExceptions == that.logParseExceptions && + maxParseExceptions == that.maxParseExceptions && + maxSavedParseExceptions == that.maxSavedParseExceptions && + Objects.equals(partitionsSpec, that.partitionsSpec) && + Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) && + Objects.equals(basePersistDirectory, that.basePersistDirectory) && + Objects.equals(indexSpec, that.indexSpec) && + Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && + Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) && + Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod); } @Override - public int hashCode() { + public int hashCode() + { return Objects.hash( + appendableIndexSpec, maxRowsInMemory, maxBytesInMemory, + skipBytesInMemoryOverheadCheck, partitionsSpec, intermediatePersistPeriod, basePersistDirectory, diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamSupervisorTuningConfig.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamSupervisorTuningConfig.java index ec71518f46c8..31e0549f06cd 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamSupervisorTuningConfig.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/SeekableStreamSupervisorTuningConfig.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.druid.json; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; import org.joda.time.Duration; import org.joda.time.Period; @@ -26,17 +27,22 @@ * This class is copied from druid source code * in order to avoid adding additional dependencies on druid-indexing-service. */ -public interface SeekableStreamSupervisorTuningConfig { - +public interface SeekableStreamSupervisorTuningConfig +{ + boolean DEFAULT_ASYNC = true; + String DEFAULT_OFFSET_FETCH_PERIOD = "PT30S"; int DEFAULT_CHAT_RETRIES = 8; String DEFAULT_HTTP_TIMEOUT = "PT10S"; String DEFAULT_SHUTDOWN_TIMEOUT = "PT80S"; String DEFAULT_REPARTITION_TRANSITION_DURATION = "PT2M"; - static Duration defaultDuration(final Period period, final String theDefault) { + static Duration defaultDuration(final Period period, final String theDefault) + { return (period == null ? new Period(theDefault) : period).toStandardDuration(); } + boolean getChatAsync(); + @JsonProperty Integer getWorkerThreads(); @@ -55,5 +61,8 @@ static Duration defaultDuration(final Period period, final String theDefault) { @JsonProperty Duration getRepartitionTransitionDuration(); + @JsonProperty + Duration getOffsetFetchPeriod(); + SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig(); -} +} \ No newline at end of file diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java index f35db6d01cba..7cd93773684f 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/json/TaskReportData.java @@ -20,40 +20,37 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; - import org.joda.time.DateTime; -import java.util.Map; - import javax.annotation.Nullable; +import java.util.Map; /** * This class is copied from druid source code * in order to avoid adding additional dependencies on druid-indexing-service. */ -@SuppressWarnings("unused") public class TaskReportData { - /** - * Task type used by serializer but does not have any functionality as far i can tell. - */ - @SuppressWarnings("unused") public enum TaskType { - ACTIVE, PUBLISHING, UNKNOWN - } - +@SuppressWarnings("unused") public class TaskReportData +{ private final String id; - private final Map startingOffsets; + private final Map startingOffsets; private final DateTime startTime; private final Long remainingSeconds; private final TaskType type; - private final Map currentOffsets; - private final Map lag; - - public TaskReportData(String id, - @Nullable Map startingOffsets, - @Nullable Map currentOffsets, - DateTime startTime, + private final Map currentOffsets; + private final Map lag; + private final Map lagMillis; + + public TaskReportData( + String id, + @Nullable Map startingOffsets, + @Nullable Map currentOffsets, + @Nullable DateTime startTime, Long remainingSeconds, TaskType type, - @Nullable Map lag) { + @Nullable Map lag, + @Nullable Map lagMillis + ) + { this.id = id; this.startingOffsets = startingOffsets; this.currentOffsets = currentOffsets; @@ -61,42 +58,83 @@ public TaskReportData(String id, this.remainingSeconds = remainingSeconds; this.type = type; this.lag = lag; + this.lagMillis = lagMillis; } - @JsonProperty public String getId() { + @JsonProperty + public String getId() + { return id; } - @JsonProperty @JsonInclude(JsonInclude.Include.NON_NULL) public Map getStartingOffsets() { + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map getStartingOffsets() + { return startingOffsets; } - @JsonProperty @JsonInclude(JsonInclude.Include.NON_NULL) public Map getCurrentOffsets() { + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map getCurrentOffsets() + { return currentOffsets; } - @JsonProperty public DateTime getStartTime() { + @JsonProperty + public DateTime getStartTime() + { return startTime; } - @JsonProperty public Long getRemainingSeconds() { + @JsonProperty + public Long getRemainingSeconds() + { return remainingSeconds; } - @JsonProperty public TaskType getType() { + @JsonProperty + public TaskType getType() + { return type; } - @JsonProperty @JsonInclude(JsonInclude.Include.NON_NULL) public Map getLag() { + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map getLag() + { return lag; } - @Override public String toString() { - return "{" + "id='" + id + '\'' + (startingOffsets != null ? ", startingOffsets=" + startingOffsets : "") + ( - currentOffsets != null ? - ", currentOffsets=" + currentOffsets : - "") + ", startTime=" + startTime + ", remainingSeconds=" + remainingSeconds + (lag != null ? - ", lag=" + lag : - "") + '}'; + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map getLagMillis() + { + return lagMillis; + } + + @Override + public String toString() + { + return "{" + + "id='" + id + '\'' + + (startingOffsets != null ? ", startingOffsets=" + startingOffsets : "") + + (currentOffsets != null ? ", currentOffsets=" + currentOffsets : "") + + ", startTime=" + startTime + + ", remainingSeconds=" + remainingSeconds + + (lag != null ? ", lag=" + lag : "") + + (lagMillis != null ? ", lagMillis=" + lagMillis : "") + + '}'; + } + + /** + * Used by the Supervisor to report status of tasks + * ACTIVE - task is waiting to be started, started, or reading + * PUBLISHING - task is publishing or registering handoff + * UNNKNOWN - unknown + */ + @SuppressWarnings("unused") public enum TaskType + { + ACTIVE, PUBLISHING, UNKNOWN } -} +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 0d9fb17e16fd..55d3df5a3590 100644 --- a/pom.xml +++ b/pom.xml @@ -132,7 +132,7 @@ 10.17.1.0 3.1.0 0.1.2 - 0.17.1 + 27.0.0 2.2.4 1.12.0 22.0