Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change generate kafka connect properties from env #10545

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions docker/kafka-setup/env_to_properties.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import os
import re
import sys


def env_to_properties(env_prefix: str, properties_file: str):
pattern = re.compile('(?<=[^_])_(?=[^_])')
props = {}

for (env_name, val) in os.environ.items():
if env_name.startswith(env_prefix):
raw_name = env_name[len(env_prefix):].lower()
prop_dot = '.'.join(pattern.split(raw_name))
props[prop_dot] = val

with open(properties_file, 'a') as f:
for k, v in props.items():
f.writelines(f'{k}={v}\n')

Comment on lines +6 to +19
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add error handling and logging.

The function lacks error handling and logging, which are essential for diagnosing issues in production.

def env_to_properties(env_prefix: str, properties_file: str):
    pattern = re.compile('(?<=[^_])_(?=[^_])')
    props = {}

    try:
        for (env_name, val) in os.environ.items():
            if env_name.startswith(env_prefix):
                raw_name = env_name[len(env_prefix):].lower()
                prop_dot = '.'.join(pattern.split(raw_name))
                props[prop_dot] = val

        with open(properties_file, 'a') as f:
            for k, v in props.items():
                f.writelines(f'{k}={v}\n')
    except Exception as e:
        print(f"Error processing environment variables: {e}", file=sys.stderr)
        raise
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def env_to_properties(env_prefix: str, properties_file: str):
pattern = re.compile('(?<=[^_])_(?=[^_])')
props = {}
for (env_name, val) in os.environ.items():
if env_name.startswith(env_prefix):
raw_name = env_name[len(env_prefix):].lower()
prop_dot = '.'.join(pattern.split(raw_name))
props[prop_dot] = val
with open(properties_file, 'a') as f:
for k, v in props.items():
f.writelines(f'{k}={v}\n')
def env_to_properties(env_prefix: str, properties_file: str):
pattern = re.compile('(?<=[^_])_(?=[^_])')
props = {}
try:
for (env_name, val) in os.environ.items():
if env_name.startswith(env_prefix):
raw_name = env_name[len(env_prefix):].lower()
prop_dot = '.'.join(pattern.split(raw_name))
props[prop_dot] = val
with open(properties_file, 'a') as f:
for k, v in props.items():
f.writelines(f'{k}={v}\n')
except Exception as e:
print(f"Error processing environment variables: {e}", file=sys.stderr)
raise


if __name__ == '__main__':
env_prefix = sys.argv[1]
properties_file = sys.argv[2]
env_to_properties(env_prefix, properties_file)
Comment on lines +1 to +24
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add tests for env_to_properties.

The function lacks test coverage. Consider adding unit tests to ensure its correctness.

Do you want me to generate the unit testing code or open a GitHub issue to track this task?

40 changes: 1 addition & 39 deletions docker/kafka-setup/kafka-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,46 +10,8 @@ fi
. kafka-config.sh

echo "bootstrap.servers=$KAFKA_BOOTSTRAP_SERVER" > $CONNECTION_PROPERTIES_PATH
echo "security.protocol=$KAFKA_PROPERTIES_SECURITY_PROTOCOL" >> $CONNECTION_PROPERTIES_PATH

## Add support for SASL_PLAINTEXT
if [[ $KAFKA_PROPERTIES_SECURITY_PROTOCOL == "SASL_PLAINTEXT" ]]; then
echo "sasl.mechanism=$KAFKA_PROPERTIES_SASL_MECHANISM" >> $CONNECTION_PROPERTIES_PATH
echo "sasl.jaas.config=$KAFKA_PROPERTIES_SASL_JAAS_CONFIG" >> $CONNECTION_PROPERTIES_PATH
echo "sasl.kerberos.service.name=$KAFKA_PROPERTIES_SASL_KERBEROS_SERVICE_NAME" >> $CONNECTION_PROPERTIES_PATH
fi

## Add support for SASL_SSL
if [[ $KAFKA_PROPERTIES_SECURITY_PROTOCOL == "SASL_SSL" ]]; then
echo "sasl.jaas.config=$KAFKA_PROPERTIES_SASL_JAAS_CONFIG" >> $CONNECTION_PROPERTIES_PATH
echo "sasl.mechanism=$KAFKA_PROPERTIES_SASL_MECHANISM" >> $CONNECTION_PROPERTIES_PATH
fi

if [[ $KAFKA_PROPERTIES_SECURITY_PROTOCOL == "SSL" ]]; then
if [[ -n $KAFKA_PROPERTIES_SSL_KEYSTORE_LOCATION ]]; then
echo "ssl.keystore.location=$KAFKA_PROPERTIES_SSL_KEYSTORE_LOCATION" >> $CONNECTION_PROPERTIES_PATH
echo "ssl.keystore.password=$KAFKA_PROPERTIES_SSL_KEYSTORE_PASSWORD" >> $CONNECTION_PROPERTIES_PATH
echo "ssl.key.password=$KAFKA_PROPERTIES_SSL_KEY_PASSWORD" >> $CONNECTION_PROPERTIES_PATH
if [[ -n $KAFKA_PROPERTIES_SSL_KEYSTORE_TYPE ]]; then
echo "ssl.keystore.type=$KAFKA_PROPERTIES_SSL_KEYSTORE_TYPE" >> $CONNECTION_PROPERTIES_PATH
fi
fi
if [[ -n $KAFKA_PROPERTIES_SSL_TRUSTSTORE_LOCATION ]]; then
echo "ssl.truststore.location=$KAFKA_PROPERTIES_SSL_TRUSTSTORE_LOCATION" >> $CONNECTION_PROPERTIES_PATH
if [[ $KAFKA_PROPERTIES_SSL_TRUSTSTORE_TYPE != "PEM" ]]; then
echo "ssl.truststore.password=$KAFKA_PROPERTIES_SSL_TRUSTSTORE_PASSWORD" >> $CONNECTION_PROPERTIES_PATH
fi
if [[ -n $KAFKA_PROPERTIES_SSL_TRUSTSTORE_TYPE ]]; then
echo "ssl.truststore.type=$KAFKA_PROPERTIES_SSL_TRUSTSTORE_TYPE" >> $CONNECTION_PROPERTIES_PATH
fi
fi
echo "ssl.endpoint.identification.algorithm=$KAFKA_PROPERTIES_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM" >> $CONNECTION_PROPERTIES_PATH
fi

# Add support for SASL_CLIENT_CALLBACK_HANDLER_CLASS
if [[ -n "$KAFKA_PROPERTIES_SASL_CLIENT_CALLBACK_HANDLER_CLASS" ]]; then
echo "sasl.client.callback.handler.class=$KAFKA_PROPERTIES_SASL_CLIENT_CALLBACK_HANDLER_CLASS" >> $CONNECTION_PROPERTIES_PATH
fi
python env_to_properties.py KAFKA_PROPERTIES_ $CONNECTION_PROPERTIES_PATH
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Address Shellcheck warning.

The surrounding quotes actually unquote this. Remove or escape them.

- python env_to_properties.py KAFKA_PROPERTIES_ $CONNECTION_PROPERTIES_PATH
+ python env_to_properties.py "KAFKA_PROPERTIES_" "$CONNECTION_PROPERTIES_PATH"
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
python env_to_properties.py KAFKA_PROPERTIES_ $CONNECTION_PROPERTIES_PATH
python env_to_properties.py "KAFKA_PROPERTIES_" "$CONNECTION_PROPERTIES_PATH"


# cub kafka-ready -c $CONNECTION_PROPERTIES_PATH -b $KAFKA_BOOTSTRAP_SERVER 1 180
. kafka-ready.sh
Expand Down
Loading