Skip to content

Commit

Permalink
[improve][connector-v2][kafka-connector] Add e2e code for Kafka Kerbe…
Browse files Browse the repository at this point in the history
…ros authentication (#8159)
  • Loading branch information
zhangshenghang authored Nov 28, 2024
1 parent 9cdaacd commit 2b821e4
Show file tree
Hide file tree
Showing 9 changed files with 931 additions and 0 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

zookeeper.connect=kafkacluster:2181
listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://kafkacluster:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka
broker.id=1
log.dirs=/var/lib/kafka/data
num.partitions=1
default.replication.factor=1
offsets.topic.num.partitions=1
offsets.topic.replication.factor=1
java.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
serviceName="kafka"
keyTab="/tmp/kafka.keytab"
useKeyTab=true
storeKey=true
principal="kafka/kafkacluster@EXAMPLE.COM";
};

KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/tmp/kafka.keytab"
principal="kafka/kafkacluster@EXAMPLE.COM";
};


Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"

#spark config
spark.app.name = "SeaTunnel"
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
FakeSource {
row.num = 10
map.size = 10
array.size = 10
bytes.length = 10
string.length = 10
schema = {
fields {
c_map = "map<string, smallint>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

sink {
Kafka {
bootstrap.servers = "kafkacluster:9092"
topic = "test_topic"
format = json
partition_key_fields = ["c_map", "c_string"]
kafka.config = {
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
sasl.mechanism=GSSAPI
java.security.krb5.conf="/etc/krb5.conf"
sasl.jaas.config="com.sun.security.auth.module.Krb5LoginModule required \n useKeyTab=true \n storeKey=true \n keyTab=\"/tmp/kafka.keytab\" \n principal=\"kafka/kafkacluster@EXAMPLE.COM\";"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"

#spark config
spark.app.name = "SeaTunnel"
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
FakeSource {
row.num = 10
map.size = 10
array.size = 10
bytes.length = 10
string.length = 10
schema = {
fields {
c_map = "map<string, smallint>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

sink {
Kafka {
bootstrap.servers = "kafkacluster:9092"
topic = "test_topic"
format = json
partition_key_fields = ["c_map", "c_string"]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"

# You can set spark configuration here
spark.app.name = "SeaTunnel"
spark.executor.instances = 1
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_topic_with_kerberos"
plugin_output = "kafka_table"
start_mode = "earliest"
format_error_handle_way = fail
kafka.config = {
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
sasl.mechanism=GSSAPI
java.security.krb5.conf="/etc/krb5.conf"
sasl.jaas.config="com.sun.security.auth.module.Krb5LoginModule required \n useKeyTab=true \n storeKey=true \n keyTab=\"/tmp/kafka.keytab\" \n principal=\"kafka/kafkacluster@EXAMPLE.COM\";"
}
schema = {
columns = [
{
name = id
type = bigint
}
{
name = c_map
type = "map<string, smallint>"
}
{
name = c_array
type = "array<tinyint>"
}
{
name = c_string
type = "string"
}
{
name = c_boolean
type = "boolean"
}
{
name = c_tinyint
type = "tinyint"
}
{
name = c_smallint
type = "smallint"
}
{
name = c_int
type = "int"
}
{
name = c_bigint
type = "bigint"
}
{
name = c_float
type = "float"
}
{
name = c_double
type = "double"
}
{
name = c_decimal
type = "decimal(2, 1)"
}
{
name = c_bytes
type = "bytes"
}
{
name = c_date
type = "date"
}
{
name = c_timestamp
type = "timestamp"
}
]
primaryKey = {
name = "primary key"
columnNames = ["id"]
}
constraintKeys = [
{
constraintName = "unique_c_string"
constraintType = UNIQUE_KEY
constraintColumns = [
{
columnName = "c_string"
sortType = ASC
}
]
}
]
}
format = text
field_delimiter = ","
}
}

sink {
Assert {
rules =
{
field_rules = [
{
field_name = id
field_type = bigint
field_value = [
{
rule_type = NOT_NULL
},
{
rule_type = MIN
rule_value = 0
},
{
rule_type = MAX
rule_value = 99
}
]
}
]
row_rules = [
{
rule_type = MIN_ROW
rule_value = 100
}
]
}
}
}
Loading

0 comments on commit 2b821e4

Please sign in to comment.