Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redact potentially sensitive config keys from log messages #62

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ jobs:

strategy:
matrix:
python-version: ["3.8", "3.9", "3.10"]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
Copy link
Author

@NoxHarmonium NoxHarmonium Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure if we wanted to remove some of the older versions, looks like Python 3.8 is EOL, but the builds run in parallel so it doesn't seem to hurt to have more versions in there.


fail-fast: false
steps:

- uses: actions/checkout@v2

- name: start Kafka and Zookeeper
run: docker-compose -f scripts/bitnami-kafka-docker-compose.yml up -d
run: docker compose -f scripts/bitnami-kafka-docker-compose.yml up -d

- name: is Kafka running?
run: docker ps -a
Expand Down
7 changes: 3 additions & 4 deletions bluesky_kafka/consume.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

from bluesky_kafka.logging_utils import redact_config
import msgpack
import msgpack_numpy as mpn

Expand Down Expand Up @@ -120,7 +121,7 @@ def __init__(

logger.debug(
"BlueskyConsumer configuration:\n%s",
self._consumer_config,
redact_config(self._consumer_config),
)
logger.debug("subscribing to Kafka topic(s): %s", topics)

Expand All @@ -129,9 +130,7 @@ def __init__(
self.closed = False

def __str__(self):
safe_config = dict(self._consumer_config)
if "sasl.password" in safe_config:
safe_config["sasl.password"] = "****"
safe_config = redact_config(self._consumer_config)
return (
f"{type(self)}("
f"topics={self._topics}, "
Expand Down
129 changes: 129 additions & 0 deletions bluesky_kafka/logging_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# See https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html and
# https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html
CONFIG_KEY_WHITELIST = {
##
# -- Common Keys
"bootstrap.servers",
"client.dns.lookup",
"client.id",
"connections.max.idle.ms",
"receive.buffer.bytes",
"request.timeout.ms",
# "sasl.client.callback.handler.class",
# "sasl.jaas.config",
# "sasl.kerberos.service.name",
# "sasl.login.callback.handler.class",
# "sasl.login.class",
# "sasl.mechanism",
# "sasl.oauthbearer.jwks.endpoint.url",
# "sasl.oauthbearer.token.endpoint.url",
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't usually like having a heap of commented out code like this, but it is handy to see at a glance what keys are getting redacted so I left it in for now.

"security.protocol",
"send.buffer.bytes",
"socket.connection.setup.timeout.max.ms",
"socket.connection.setup.timeout.ms",
# "ssl.enabled.protocols",
# "ssl.keystore.type",
# "ssl.protocol",
# "ssl.provider",
# "ssl.truststore.type",
"auto.include.jmx.reporter",
"enable.metrics.push",
"interceptor.classes",
"metadata.max.age.ms",
"metric.reporters",
"metrics.num.samples",
"metrics.recording.level",
"metrics.sample.window.ms",
"reconnect.backoff.max.ms",
"reconnect.backoff.ms",
"retry.backoff.max.ms",
"retry.backoff.ms",
# "sasl.kerberos.kinit.cmd",
# "sasl.kerberos.min.time.before.relogin",
# "sasl.kerberos.ticket.renew.jitter",
# "sasl.kerberos.ticket.renew.window.factor",
# "sasl.login.connect.timeout.ms",
# "sasl.login.read.timeout.ms",
# "sasl.login.refresh.buffer.seconds",
# "sasl.login.refresh.min.period.seconds",
# "sasl.login.refresh.window.factor",
# "sasl.login.refresh.window.jitter",
# "sasl.login.retry.backoff.max.ms",
# "sasl.login.retry.backoff.ms",
# "sasl.oauthbearer.clock.skew.seconds",
# "sasl.oauthbearer.expected.audience",
# "sasl.oauthbearer.expected.issuer",
# "sasl.oauthbearer.jwks.endpoint.refresh.ms",
# "sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms",
# "sasl.oauthbearer.jwks.endpoint.retry.backoff.ms",
# "sasl.oauthbearer.scope.claim.name",
# "sasl.oauthbearer.sub.claim.name",
# "security.providers",
# "ssl.cipher.suites",
# "ssl.endpoint.identification.algorithm",
# "ssl.engine.factory.class",
# "ssl.keymanager.algorithm",
# "ssl.secure.random.implementation",
# "ssl.trustmanager.algorithm",
##
# -- Consumer Only Keys
"key.deserializer",
"value.deserializer",
"fetch.min.bytes",
"group.id",
"group.protocol",
"heartbeat.interval.ms",
"max.partition.fetch.bytes",
"session.timeout.ms",
"allow.auto.create.topics",
"auto.offset.reset",
"default.api.timeout.ms",
"enable.auto.commit",
"exclude.internal.topics",
"fetch.max.bytes",
"group.instance.id",
"group.remote.assignor",
"isolation.level",
"max.poll.interval.ms",
"max.poll.records",
"partition.assignment.strategy",
"auto.commit.interval.ms",
"check.crcs",
"client.rack",
"fetch.max.wait.ms",
##
# -- Producer Only Keys
"key.serializer",
"value.serializer",
"buffer.memory",
"compression.type",
"retries",
"batch.size",
"delivery.timeout.ms",
"linger.ms",
"max.block.ms",
"max.request.size",
"partitioner.class",
"partitioner.ignore.keys",
"acks",
"enable.idempotence",
"max.in.flight.requests.per.connection",
"metadata.max.idle.ms",
"partitioner.adaptive.partitioning.enable",
"partitioner.availability.timeout.ms",
"transaction.timeout.ms",
"transactional.id",
}


def redact_config(config):
"""
Takes a consumer/producer config dictionary and makes sure that anything potentially sensitive
is masked out by asterisks so it can be safely logged.

Parameters
----------
config : dict
Dictionary of configuration information used to construct a consumer or producer.
"""
return {k: v if k in CONFIG_KEY_WHITELIST else "****" for k, v in config.items()}
7 changes: 3 additions & 4 deletions bluesky_kafka/produce.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

from bluesky_kafka.logging_utils import redact_config
import msgpack
import msgpack_numpy as mpn

Expand Down Expand Up @@ -119,7 +120,7 @@ def __init__(
else:
self._producer_config["bootstrap.servers"] = ",".join(bootstrap_servers)

logger.debug("producer configuration: %s", self._producer_config)
logger.debug("producer configuration: %s", redact_config(self._producer_config))

if on_delivery is None:
self.on_delivery = default_delivery_report
Expand All @@ -130,9 +131,7 @@ def __init__(
self._serializer = serializer

def __str__(self):
safe_config = dict(self._producer_config)
if "sasl.password" in safe_config:
safe_config["sasl.password"] = "****"
safe_config = redact_config(self._producer_config)
return (
f"{type(self)}("
f"topic='{self.topic}', "
Expand Down
29 changes: 29 additions & 0 deletions bluesky_kafka/tests/test_logging_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from bluesky_kafka.logging_utils import redact_config


def test_redact_config():
example_config = {
"bootstrap.servers": "some-kafka-server:9092",
"sasl.mechanisms": "PLAIN",
"sasl.username": "brokerUser",
"security.protocol": "SASL_SSL",
"ssl.ca.location": "/opt/kafka/config/certs/kafka-tls-ca",
"sasl.password": "SECRET PASSWORD",
"acks": "all", # Producer key
"enable.idempotence": "false",
"group.id": "some-group", # Consumer key
}

reacted_example_config = redact_config(example_config)

assert reacted_example_config == {
"bootstrap.servers": "some-kafka-server:9092",
"sasl.mechanisms": "****",
"sasl.username": "****",
"security.protocol": "SASL_SSL",
"ssl.ca.location": "****",
"sasl.password": "****",
"acks": "all",
"enable.idempotence": "false",
"group.id": "some-group",
}
Loading