diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 47100f4..0a071dd 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -11,7 +11,7 @@ jobs: strategy: matrix: - python-version: ["3.8", "3.9", "3.10"] + python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] fail-fast: false steps: @@ -19,7 +19,7 @@ jobs: - uses: actions/checkout@v2 - name: start Kafka and Zookeeper - run: docker-compose -f scripts/bitnami-kafka-docker-compose.yml up -d + run: docker compose -f scripts/bitnami-kafka-docker-compose.yml up -d - name: is Kafka running? run: docker ps -a diff --git a/bluesky_kafka/consume.py b/bluesky_kafka/consume.py index 6ded6f0..bf5797e 100644 --- a/bluesky_kafka/consume.py +++ b/bluesky_kafka/consume.py @@ -1,5 +1,6 @@ import logging +from bluesky_kafka.logging_utils import redact_config import msgpack import msgpack_numpy as mpn @@ -120,7 +121,7 @@ def __init__( logger.debug( "BlueskyConsumer configuration:\n%s", - self._consumer_config, + redact_config(self._consumer_config), ) logger.debug("subscribing to Kafka topic(s): %s", topics) @@ -129,9 +130,7 @@ def __init__( self.closed = False def __str__(self): - safe_config = dict(self._consumer_config) - if "sasl.password" in safe_config: - safe_config["sasl.password"] = "****" + safe_config = redact_config(self._consumer_config) return ( f"{type(self)}(" f"topics={self._topics}, " diff --git a/bluesky_kafka/logging_utils.py b/bluesky_kafka/logging_utils.py new file mode 100644 index 0000000..793b5ae --- /dev/null +++ b/bluesky_kafka/logging_utils.py @@ -0,0 +1,129 @@ +# See https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html and +# https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html +CONFIG_KEY_WHITELIST = { + ## + # -- Common Keys + "bootstrap.servers", + "client.dns.lookup", + "client.id", + "connections.max.idle.ms", + "receive.buffer.bytes", + "request.timeout.ms", + # "sasl.client.callback.handler.class", + # "sasl.jaas.config", + # "sasl.kerberos.service.name", + # "sasl.login.callback.handler.class", + # "sasl.login.class", + # "sasl.mechanism", + # "sasl.oauthbearer.jwks.endpoint.url", + # "sasl.oauthbearer.token.endpoint.url", + "security.protocol", + "send.buffer.bytes", + "socket.connection.setup.timeout.max.ms", + "socket.connection.setup.timeout.ms", + # "ssl.enabled.protocols", + # "ssl.keystore.type", + # "ssl.protocol", + # "ssl.provider", + # "ssl.truststore.type", + "auto.include.jmx.reporter", + "enable.metrics.push", + "interceptor.classes", + "metadata.max.age.ms", + "metric.reporters", + "metrics.num.samples", + "metrics.recording.level", + "metrics.sample.window.ms", + "reconnect.backoff.max.ms", + "reconnect.backoff.ms", + "retry.backoff.max.ms", + "retry.backoff.ms", + # "sasl.kerberos.kinit.cmd", + # "sasl.kerberos.min.time.before.relogin", + # "sasl.kerberos.ticket.renew.jitter", + # "sasl.kerberos.ticket.renew.window.factor", + # "sasl.login.connect.timeout.ms", + # "sasl.login.read.timeout.ms", + # "sasl.login.refresh.buffer.seconds", + # "sasl.login.refresh.min.period.seconds", + # "sasl.login.refresh.window.factor", + # "sasl.login.refresh.window.jitter", + # "sasl.login.retry.backoff.max.ms", + # "sasl.login.retry.backoff.ms", + # "sasl.oauthbearer.clock.skew.seconds", + # "sasl.oauthbearer.expected.audience", + # "sasl.oauthbearer.expected.issuer", + # "sasl.oauthbearer.jwks.endpoint.refresh.ms", + # "sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms", + # "sasl.oauthbearer.jwks.endpoint.retry.backoff.ms", + # "sasl.oauthbearer.scope.claim.name", + # "sasl.oauthbearer.sub.claim.name", + # "security.providers", + # "ssl.cipher.suites", + # "ssl.endpoint.identification.algorithm", + # "ssl.engine.factory.class", + # "ssl.keymanager.algorithm", + # "ssl.secure.random.implementation", + # "ssl.trustmanager.algorithm", + ## + # -- Consumer Only Keys + "key.deserializer", + "value.deserializer", + "fetch.min.bytes", + "group.id", + "group.protocol", + "heartbeat.interval.ms", + "max.partition.fetch.bytes", + "session.timeout.ms", + "allow.auto.create.topics", + "auto.offset.reset", + "default.api.timeout.ms", + "enable.auto.commit", + "exclude.internal.topics", + "fetch.max.bytes", + "group.instance.id", + "group.remote.assignor", + "isolation.level", + "max.poll.interval.ms", + "max.poll.records", + "partition.assignment.strategy", + "auto.commit.interval.ms", + "check.crcs", + "client.rack", + "fetch.max.wait.ms", + ## + # -- Producer Only Keys + "key.serializer", + "value.serializer", + "buffer.memory", + "compression.type", + "retries", + "batch.size", + "delivery.timeout.ms", + "linger.ms", + "max.block.ms", + "max.request.size", + "partitioner.class", + "partitioner.ignore.keys", + "acks", + "enable.idempotence", + "max.in.flight.requests.per.connection", + "metadata.max.idle.ms", + "partitioner.adaptive.partitioning.enable", + "partitioner.availability.timeout.ms", + "transaction.timeout.ms", + "transactional.id", +} + + +def redact_config(config): + """ + Takes a consumer/producer config dictionary and makes sure that anything potentially sensitive + is masked out by asterisks so it can be safely logged. + + Parameters + ---------- + config : dict + Dictionary of configuration information used to construct a consumer or producer. + """ + return {k: v if k in CONFIG_KEY_WHITELIST else "****" for k, v in config.items()} diff --git a/bluesky_kafka/produce.py b/bluesky_kafka/produce.py index 1b284b0..6d4be0a 100644 --- a/bluesky_kafka/produce.py +++ b/bluesky_kafka/produce.py @@ -1,5 +1,6 @@ import logging +from bluesky_kafka.logging_utils import redact_config import msgpack import msgpack_numpy as mpn @@ -119,7 +120,7 @@ def __init__( else: self._producer_config["bootstrap.servers"] = ",".join(bootstrap_servers) - logger.debug("producer configuration: %s", self._producer_config) + logger.debug("producer configuration: %s", redact_config(self._producer_config)) if on_delivery is None: self.on_delivery = default_delivery_report @@ -130,9 +131,7 @@ def __init__( self._serializer = serializer def __str__(self): - safe_config = dict(self._producer_config) - if "sasl.password" in safe_config: - safe_config["sasl.password"] = "****" + safe_config = redact_config(self._producer_config) return ( f"{type(self)}(" f"topic='{self.topic}', " diff --git a/bluesky_kafka/tests/test_logging_utils.py b/bluesky_kafka/tests/test_logging_utils.py new file mode 100644 index 0000000..4f23ecd --- /dev/null +++ b/bluesky_kafka/tests/test_logging_utils.py @@ -0,0 +1,29 @@ +from bluesky_kafka.logging_utils import redact_config + + +def test_redact_config(): + example_config = { + "bootstrap.servers": "some-kafka-server:9092", + "sasl.mechanisms": "PLAIN", + "sasl.username": "brokerUser", + "security.protocol": "SASL_SSL", + "ssl.ca.location": "/opt/kafka/config/certs/kafka-tls-ca", + "sasl.password": "SECRET PASSWORD", + "acks": "all", # Producer key + "enable.idempotence": "false", + "group.id": "some-group", # Consumer key + } + + reacted_example_config = redact_config(example_config) + + assert reacted_example_config == { + "bootstrap.servers": "some-kafka-server:9092", + "sasl.mechanisms": "****", + "sasl.username": "****", + "security.protocol": "SASL_SSL", + "ssl.ca.location": "****", + "sasl.password": "****", + "acks": "all", + "enable.idempotence": "false", + "group.id": "some-group", + } diff --git a/scripts/start_kafka.sh b/scripts/start_kafka.sh index df2bafa..9f201cf 100644 --- a/scripts/start_kafka.sh +++ b/scripts/start_kafka.sh @@ -1 +1 @@ -sudo docker-compose -f bitnami-kafka-docker-compose.yml up \ No newline at end of file +sudo docker compose -f bitnami-kafka-docker-compose.yml up \ No newline at end of file