Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Hive MetaStore support to kafka-connect-s3 #572

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Build products
target/
build/
kafka-connect-s3/dependency-reduced-pom.xml

# IntelliJ data
*.iml
Expand All @@ -12,6 +13,14 @@ build/
.project
.settings/

# Netbeans
nb-configuration.xml
nbactions.xml

# Hive metastore
metastore_db/
derby.log

# Documentation build output
/docs/_build

Expand Down
968 changes: 919 additions & 49 deletions kafka-connect-s3/pom.xml

Large diffs are not rendered by default.

45 changes: 45 additions & 0 deletions kafka-connect-s3/src/assembly/hive.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
<id>hive</id>
<formats>
<format>dir</format>
<format>tar.gz</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>${project.parent.basedir}</directory>
<outputDirectory>doc/</outputDirectory>
<includes>
<include>version.txt</include>
<include>README*</include>
<include>LICENSE*</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.basedir}</directory>
<outputDirectory>doc/</outputDirectory>
<includes>
<include>licenses.html</include>
<include>licenses/</include>
<include>notices/</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.basedir}/config</directory>
<outputDirectory>etc/</outputDirectory>
<includes>
<include>*</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.basedir}/target</directory>
<outputDirectory>lib/</outputDirectory>
<includes>
<include>${project.name}-*-shaded.jar</include>
</includes>
</fileSet>
</fileSets>
</assembly>
13 changes: 13 additions & 0 deletions kafka-connect-s3/src/assembly/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@
<exclude>io.confluent:kafka-connect-storage-core</exclude>
<exclude>io.confluent:kafka-connect-storage-format</exclude>
<exclude>io.confluent:kafka-connect-storage-partitioner</exclude>
<!-- Exclude jars which are only needed for Hive MetaStore integration -->
<exclude>ch.qos.reload4j:reload4j</exclude>
<exclude>com.amazonaws:aws-java-sdk-bundle</exclude>
<exclude>com.lmax:disruptor</exclude>
<exclude>io.confluent:kafka-connect-storage-hive</exclude>
<exclude>org.apache.avro:avro</exclude>
<exclude>org.apache.hadoop:hadoop-aws</exclude>
<exclude>org.apache.hadoop:hadoop-common</exclude>
<exclude>org.apache.hive:hive-common</exclude>
<exclude>org.apache.hive:hive-exec</exclude>
<exclude>org.apache.hive:hive-storage-api</exclude>
<exclude>org.apache.hive.hcatalog:hive-hcatalog-core</exclude>
<exclude>org.apache.parquet:parquet-hadoop-bundle</exclude>
</excludes>
</dependencySet>
</dependencySets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@
import io.confluent.connect.storage.common.ParentValueRecommender;
import io.confluent.connect.storage.common.StorageCommonConfig;
import io.confluent.connect.storage.format.Format;
import io.confluent.connect.storage.hive.HiveConfig;
import static io.confluent.connect.storage.hive.HiveConfig.HIVE_DATABASE_CONFIG;
import static io.confluent.connect.storage.hive.HiveConfig.HIVE_INTEGRATION_CONFIG;
import io.confluent.connect.storage.partitioner.DailyPartitioner;
import io.confluent.connect.storage.partitioner.DefaultPartitioner;
import io.confluent.connect.storage.partitioner.FieldPartitioner;
Expand Down Expand Up @@ -225,6 +228,19 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {
new ParentValueRecommender(
STORE_KAFKA_HEADERS_CONFIG, true, HEADERS_FORMAT_CLASS_VALID_VALUES);

private static final String TOPIC_SUBSTITUTION = "${topic}";
public static final String HIVE_TABLE_NAME_CONFIG = "hive.table.name";
public static final String HIVE_TABLE_NAME_DEFAULT = TOPIC_SUBSTITUTION;
private static final String HIVE_TABLE_NAME_DOC = "The hive table name to use. "
+ "It must contain '${topic}' to inject the corresponding topic name.";
private static final String HIVE_TABLE_NAME_DISPLAY = "Hive table name";

public static final String HIVE_S3_PROTOCOL_CONFIG = "hive.s3.protocol";
public static final String HIVE_S3_PROTOCOL_DEFAULT = "s3";
public static final String HIVE_S3_PROTOCOL_DOC = "The protocol used when generating Hive "
+ "table LOCATION.";
public static final String HIVE_S3_PROTOCOL_DISPLAY = "Hive s3 protocol";

static {
STORAGE_CLASS_RECOMMENDER.addValidValues(
Collections.singletonList(S3Storage.class)
Expand Down Expand Up @@ -716,27 +732,63 @@ public static ConfigDef newConfigDef() {
Width.LONG,
"Elastic buffer initial capacity"
);
}

{
final String group = "Hive";
int orderInGroup = 0;
configDef.define(
HIVE_TABLE_NAME_CONFIG,
Type.STRING,
HIVE_TABLE_NAME_DEFAULT,
Importance.LOW,
HIVE_TABLE_NAME_DOC,
group,
++orderInGroup,
Width.SHORT,
HIVE_TABLE_NAME_DISPLAY
);

configDef.define(
HIVE_S3_PROTOCOL_CONFIG,
Type.STRING,
HIVE_S3_PROTOCOL_DEFAULT,
Importance.LOW,
HIVE_S3_PROTOCOL_DOC,
group,
++orderInGroup,
Width.SHORT,
HIVE_S3_PROTOCOL_DISPLAY
);
}
return configDef;
}


public S3SinkConnectorConfig(Map<String, String> props) {
this(newConfigDef(), props);
}

protected S3SinkConnectorConfig(ConfigDef configDef, Map<String, String> props) {
super(configDef, props);

this.name = parseName(originalsStrings());

ConfigDef storageCommonConfigDef = StorageCommonConfig.newConfigDef(STORAGE_CLASS_RECOMMENDER);
StorageCommonConfig commonConfig = new StorageCommonConfig(storageCommonConfigDef,
originalsStrings());
StorageCommonConfig commonConfig = new StorageCommonConfig(
storageCommonConfigDef,
originalsStrings()
);
addToGlobal(commonConfig);

ConfigDef partitionerConfigDef = PartitionerConfig.newConfigDef(PARTITIONER_CLASS_RECOMMENDER);
PartitionerConfig partitionerConfig = new PartitionerConfig(partitionerConfigDef,
originalsStrings());

this.name = parseName(originalsStrings());
addToGlobal(partitionerConfig);
addToGlobal(commonConfig);

HiveConfig hiveConfig = new HiveConfig(originalsStrings());
addToGlobal(hiveConfig);

addToGlobal(this);
validateTimezone();
}
Expand Down Expand Up @@ -1164,6 +1216,7 @@ public static ConfigDef getConfig() {
addAllConfigKeys(visible, newConfigDef(), skip);
addAllConfigKeys(visible, StorageCommonConfig.newConfigDef(STORAGE_CLASS_RECOMMENDER), skip);
addAllConfigKeys(visible, PartitionerConfig.newConfigDef(PARTITIONER_CLASS_RECOMMENDER), skip);
addAllConfigKeys(visible, HiveConfig.getConfig(), skip);

return visible;
}
Expand All @@ -1180,6 +1233,39 @@ public String nullValueBehavior() {
return getString(BEHAVIOR_ON_NULL_VALUES_CONFIG);
}

public boolean hiveIntegrationEnabled() {
return getBoolean(HIVE_INTEGRATION_CONFIG);
}

public String hiveDatabase() {
return getString(HIVE_DATABASE_CONFIG);
}

private String getNormalizeHiveTableName(final String topicName) {
if (topicName != null) {
return topicName.replaceAll("[.-]", "_");
}
return topicName;
}

/**
* Performs all substitutions on {@value HIVE_TABLE_NAME_CONFIG} and calculates the final
* hive table name for the given topic
*
* @param topic String - the topic name
* @return String the hive table name
*/
public String getHiveTableName(String topic) {
return getString(HIVE_TABLE_NAME_CONFIG).replace(
"${topic}",
getNormalizeHiveTableName(topic)
);
}

public String getHiveS3Protocol() {
return getString(HIVE_S3_PROTOCOL_CONFIG);
}

public enum IgnoreOrFailBehavior {
IGNORE,
FAIL;
Expand Down
Loading