Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add security layer negotiation to the GSSAPI authentication. #1283

Closed
wants to merge 1 commit into from

Conversation

asdaraujo
Copy link
Contributor

When trying to establish a connection with Kafka using SASL with the
GSSAPI authentication mechanism the connection was hanging an timing out
after 60 secons. On the Kafka broker side I noticed that the
SaslServerAuthenticator was going from the AUTHENTICATE to the FAILED state.

The GSSAPI auth implementation was missing the second handshake defined in
RFC 2222, which happens after the security context is established. This
handshake is used by the client and server to negotiate the security layer (QoP)
to be used for the connection.

Kafka currently only support the "auth" QoP, so the implementation in this commit
doesn't make it configurable, but this can be extended later.

With this change I was able to successfully connect to a Kerberos-enabled Kafka
broker using the SASL_PLAINTEXT protocol and the GSSAPI mechanism.

When trying to establish a connection with Kafka using SASL with the
GSSAPI authentication mechanism the connection was hanging an timing out
after 60 secons. On the Kafka broker side I noticed that the
SaslServerAuthenticator was going from the AUTHENTICATE to the FAILED state.

The GSSAPI auth implementation was missing the second handshake defined in
RFC 2222, which happens after the security context is established. This
handshake is used by the client and server to negotiate the security layer (QoP)
to be used for the connection.

Kafka currently only support the "auth" QoP, so the implementation in this commit
doesn't make it configurable, but this can be extended later.

With this change I was able to successfully connect to a Kerberos-enabled Kafka
broker using the SASL_PLAINTEXT protocol and the GSSAPI mechanism.
@dpkp
Copy link
Owner

dpkp commented Oct 25, 2017

Thanks so much for the PR. I'm a little wary of changes to the raw protocol parser, but will let tests pass and then see if it impacts benchmark performance.

Do you have any quick docs that we could add re: setting up a client for gssapi connection? Also, any suggestions re: putting additional configuration in kafka-python, like keytab etc? I assume that you are setting these externally via environment variable or system defaults?

@asdaraujo
Copy link
Contributor Author

No worries. I understand your concerns about changing this at such low level.

Nevertheless, without the change I could not connect to a Kerberos-enabled broker over SASL_PLAINTEXT and using GSSAPI. The connection would hang forever because the broker would keep waiting for the QoP negotiation. How have you tested the GSSAPI authentication? Have you ever run into this situation? It would be good to understand the environment where it was tested before to see if I could reproduce in my own cluster.

BTW, the cluster I'm testing this on is running the Cloudera Distribution of Apache Kafka 2.1.1, which is based on Kafka 0.10.0.0 with a bunch of fixes on top of it.

All the changes I made were derived directly from RFC 2222. I wouldn't call that a quick doc though :) All we need for this change, though, is explained in sections 7.2.[1-3], and it's quite well explained there. Let me know if I can help make it clearer.

@asdaraujo
Copy link
Contributor Author

Oops, sorry, I misunderstood your question about the documentation.
I'll create a quick document to explain what I did to test and I'll post it here.

@asdaraujo
Copy link
Contributor Author

@dpkp , here's what I used to test the connections before and after my changes:

from kafka import KafkaProducer
params = ...
producer = KafkaProducer(**params)
msg = 'Hello, World!'
producer.send(cfg['topic'], bytes(msg))
producer.flush()
producer.close()

The only thing that needs to be changed between PLAIN and SASL authentication are the parameters passed to the producer. Without SASL, I used these parameters:

params = {
  'bootstrap_servers': ['broker1:9092','broker2:9092'],
  'security_protocol': 'PLAINTEXT',
}

To connect to a broker using SASL, the parameters need to be changed to this:

params = {
  'bootstrap_servers': ['broker1:9092','broker2:9092'],
  'security_protocol': 'SASL_PLAINTEXT',
  'sasl_mechanism': 'GSSAPI',
}

To successfully authentication with the broker the user need to acquire a valid Kerberos ticket. Both of the methods below work fine with the Python example:

  1. Kinit manually beforehand
kinit alice
python test-producer.py
  1. Set the KRB5_CLIENT_KTNAME env variable pointing to a valid keytab
export KRB5_CLIENT_KTNAME=/home/alice/alice.keytab
python test-producer.py

Adding a keytab option to kafka-python would be a plus but not essential, since this can be easily worked around as per above. Currently the QoP level set by this patch is hard-coded to 'auth', since that's the only one supported by Kafka. I could be easily parameterized later when/if Kafka is extended to support them. I'm not sure if this in the roadmap, though, since it already supports SASL_SSL, which provides integrity and confidentiality.

For reference, this is my SASL-enabled broker configuration:

        advertised.host.name = null
        advertised.listeners = null
        advertised.port = null
        authorizer.class.name =
        auto.create.topics.enable = true
        auto.leader.rebalance.enable = true
        background.threads = 10
        broker.id = 82
        broker.id.generation.enable = false
        broker.rack = null
        compression.type = producer
        connections.max.idle.ms = 600000
        controlled.shutdown.enable = true
        controlled.shutdown.max.retries = 3
        controlled.shutdown.retry.backoff.ms = 5000
        controller.socket.timeout.ms = 30000
        default.replication.factor = 1
        delete.topic.enable = true
        fetch.purgatory.purge.interval.requests = 1000
        group.max.session.timeout.ms = 300000
        group.min.session.timeout.ms = 6000
        host.name =
        inter.broker.protocol.version = 0.10.0-IV1
        leader.imbalance.check.interval.seconds = 300
        leader.imbalance.per.broker.percentage = 10
        listeners = SASL_PLAINTEXT://broker-1.gce.example.com:9092,
        log.cleaner.backoff.ms = 15000
        log.cleaner.dedupe.buffer.size = 134217728
        log.cleaner.delete.retention.ms = 604800000
        log.cleaner.enable = true
        log.cleaner.io.buffer.load.factor = 0.9
        log.cleaner.io.buffer.size = 524288
        log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
        log.cleaner.min.cleanable.ratio = 0.5
        log.cleaner.threads = 1
        log.cleanup.policy = delete
        log.dir = /tmp/kafka-logs
        log.dirs = /var/local/kafka/data
        log.flush.interval.messages = 9223372036854775807
        log.flush.interval.ms = null
        log.flush.offset.checkpoint.interval.ms = 60000
        log.flush.scheduler.interval.ms = 9223372036854775807
        log.index.interval.bytes = 4096
        log.index.size.max.bytes = 10485760
        log.message.format.version = 0.10.0-IV1
        log.message.timestamp.difference.max.ms = 9223372036854775807
        log.message.timestamp.type = CreateTime
        log.preallocate = false
        log.retention.bytes = -1
        log.retention.check.interval.ms = 300000
        log.retention.hours = 168
        log.retention.minutes = null
        log.retention.ms = null
        log.roll.hours = 168
        log.roll.jitter.hours = 0
        log.roll.jitter.ms = null
        log.roll.ms = null
        log.segment.bytes = 1073741824
        log.segment.delete.delay.ms = 60000
        max.connections.per.ip = 2147483647
        max.connections.per.ip.overrides =
        message.max.bytes = 1000000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.sample.window.ms = 30000
        min.insync.replicas = 1
        num.io.threads = 8
        num.network.threads = 3
        num.partitions = 1
        num.recovery.threads.per.data.dir = 1
        num.replica.fetchers = 1
        offset.metadata.max.bytes = 4096
        offsets.commit.required.acks = -1
        offsets.commit.timeout.ms = 5000
        offsets.load.buffer.size = 5242880
        offsets.retention.check.interval.ms = 600000
        offsets.retention.minutes = 1440
        offsets.topic.compression.codec = 0
        offsets.topic.num.partitions = 50
        offsets.topic.replication.factor = 3
        offsets.topic.segment.bytes = 104857600
        port = 9092
        principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
        producer.purgatory.purge.interval.requests = 1000
        queued.max.requests = 500
        quota.consumer.default = 9223372036854775807
        quota.producer.default = 9223372036854775807
        quota.window.num = 11
        quota.window.size.seconds = 1
        replica.fetch.backoff.ms = 1000
        replica.fetch.max.bytes = 1048576
        replica.fetch.min.bytes = 1
        replica.fetch.wait.max.ms = 500
        replica.high.watermark.checkpoint.interval.ms = 5000
        replica.lag.time.max.ms = 10000
        replica.socket.receive.buffer.bytes = 65536
        replica.socket.timeout.ms = 30000
        request.timeout.ms = 30000
        reserved.broker.max.id = 1000
        sasl.enabled.mechanisms = [GSSAPI]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.principal.to.local.rules = [DEFAULT]
        sasl.kerberos.service.name = kafka
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism.inter.broker.protocol = GSSAPI
        security.inter.broker.protocol = SASL_PLAINTEXT
        socket.receive.buffer.bytes = 102400
        socket.request.max.bytes = 104857600
        socket.send.buffer.bytes = 102400
        ssl.cipher.suites = null
        ssl.client.auth = none
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        unclean.leader.election.enable = false
        zookeeper.connect = broker-4.gce.example.com:2181
        zookeeper.connection.timeout.ms = null
        zookeeper.session.timeout.ms = 6000
        zookeeper.set.acl = false
        zookeeper.sync.time.ms = 2000

HTH

@asdaraujo
Copy link
Contributor Author

I saw the build error above and re-run the tests on my own machine and they all passed:

Results (80.43s):
     542 passed
      89 skipped
___ summary ___
  py27: commands succeeded
  congratulations :)

Would you know what happened to the automated build?

@asdaraujo
Copy link
Contributor Author

Hold on a bit on this PR. I'm looking at adding some support to SASL integration tests. I noticed that all tests are run in PLAINTEXT and there needs to be some changes to allow SASL tests.

@tvoinarovskyi
Copy link
Collaborator

Would you know what happened to the automated build

Don't mind the pypy builds, the do fail from time to time, we just restart them. You can see the errors on travis by clicking on the failed check.

@dpkp
Copy link
Owner

dpkp commented Oct 26, 2017

Test coverage for SASL would be greatly appreciated! The original support was a community submission, which we decided to accept w/o tests hoping that others might find it useful and eventually add test coverage.

try:
if type(data) is not str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What aboutif not isinstance(data, str) rather than limiting yourself with type()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I'll change that. Thanks!

@asdaraujo
Copy link
Contributor Author

Hi, guys,

In regards to the GSSAPI integration tests, should build_integration.sh deploy and configure a local MIT Kerberos to be used in the tests?

André

@dpkp
Copy link
Owner

dpkp commented Nov 6, 2017

The structure so far has been that build_integration.sh is responsible for fetching external, non-python binaries (the kafka server distribution) and saving them to a local directory that is not tracked in source control. The test.fixtures module then manages running fixtures using those local binaries. So I'd say that if there are external binary artifacts that are needed to setup GSSAPI integration fixtures then they should be downloaded in build_integration.sh . But the deployment of fixtures should probably be handled in the test.* modules as much as possible.

@jeffwidman
Copy link
Collaborator

jeffwidman commented Nov 6, 2017

But the deployment of fixtures should probably be handled in the test.* modules as much as possible.

@asdaraujo Given that we use pytest, you might find https://docs.pytest.org/en/latest/fixture.html very convenient. It's beautiful magic in that it reduces a lot of boilerplate code in an optimized fashion (for example, see fixture scopes) while still being pretty clear what's actually happening behind the scenes.

An again, thank you for being willing to put some time into these, it is very much appreciated!

@asdaraujo
Copy link
Contributor Author

Thanks, @dpkp and @jeffwidman .

So I'd say that if there are external binary artifacts that are needed to setup GSSAPI integration fixtures then they should be downloaded in build_integration.sh . But the deployment of fixtures should probably be handled in the test.* modules as much as possible.

A have a few thoughts/questions about this:

About the Kerberos build

The Kerberos "fixture" differs a bit from Kafka in that it requires to be built for the local architecture. I was leaning towards having build_integration.sh use apt/yum to install the Kerberos service and then configure the service correctly for use with the integration tests. I believe this would be better since the available OS packages are already well integrated with the specific OS and would avoid having to install development packages altogether (gcc, make, bison, etc...)

The alternative would be to download the Kerberos source from MIT and build it as required, but I think this could be an overkill. Assuming that the integration tests will always run on RedHat or Debian OS's, having Kerberos installed through packages should suffice.

Kerberos instances and configuration

I would like to avoid having multiple Kerberos fixtures running at once on a single server using different ports and configurations. This is very unusual and could lead to unforeseen issues. It also seems unnecessary to me to start/stop the Kerberos KDC server before and after every tests.

I believe it's better to have a single Kerberos realm set up and used for all the integration tests. The approach that I had in mind was to complete the Kerberos configuration and startup in the
build_integration.sh script and then pass the Kerberos realm and keytab as parameters to the integration tests through environment variables, in a similar way it's done with KAFKA_VERSION.

The Kerberos server and keytab would then be configured and started only once. When the tests are launched, if the KERBEROS_REALM and KAFKA_KEYTAB variables are set, all the tests will run with Kerberos integration and use GSSAPI authentication. If they aren't set, the same tests will run, with without SASL/GSSAPI.

A Python fixture could still be used to start/stop Kerberos, but I'm not sure how much value that would add. I'd be inclined to let the OS manage the service but if you think it's better to have a fixture to spawn/stop it I can look into it. In that case, I wonder it if would be better to build it from source to ensure installation prefix is controlled and not OS-dependent.

@asdaraujo Given that we use pytest, you might find https://docs.pytest.org/en/latest/fixture.html very convenient. It's beautiful magic in that it reduces a lot of boilerplate code in an optimized fashion (for example, see fixture scopes) while still being pretty clear what's actually happening behind the scenes.

Yeah, I've seen that. I found some inconsistency with the integration tests. Not sure if that was intentional or not. Many of the tests use fixtures but most of the existing integration tests are based on TestCase classes, instead, setting up and tearing down fixtures explicitly.

I've decided to break down my PR in two: some changes to tests to pave way for the GSSAPI tests, and then the GSSAPI changes (this PR). I'm currently working on the improvements to the integration tests.

However, since most of the integration tests were using TestCase classes, I kept that convention and extended from there.

Let me know your thoughts.

@jeffwidman
Copy link
Collaborator

Re: TestCase, IMHO, a lot of that is just legacy cruft from before we used pytest. Personally, I would rather standardize on pytest fixtures.

I've been meaning to migrate the TestCase classes over to pytest style fixtures, but decided to wait because many of the old SimpleClient/SimpleConsumer/SimpleProducer tests use TestCase, and we are intending to remove those altogether (#1193 / #1196), so no sense migrating them. But that in turn is held up by #633, which probably requires writing an AdminClient (#935) first.

None of these are absolute blockers of each other in any way, it just is the most efficient order to tackle all of these things without doing extra work.

@asdaraujo
Copy link
Contributor Author

Thanks, @jeffwidman , I'll take that into account.

Let me know your thoughts about the approach for the Kerberos use.

@dpkp
Copy link
Owner

dpkp commented Dec 8, 2017

I'd love to get this GSSAPI fix merged before next release.

@asdaraujo
Copy link
Contributor Author

@dpkp I was holding off on this because I wanted to to first introduce SASL coverage to integration test (#1293). I did test SASL connections with my cluster successfully, but the integration tests all run with PLAINTEXT and don't cover SASL at all. Just trying to be on the safe side.

If you want to go ahead with this before the tests I can finish and push the minor changes mentioned above.

Let me know what you prefer.

# Kafka currently doesn't support integrity or confidentiality security layers, so we
# simply set QoP to 'auth' only (first octet). We reuse the max message size proposed
# by the server
msg = Int8.encode(SASL_QOP_AUTH & Int8.decode(msg[0])) + msg[1:]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my only concern is that this single call to decode() is requiring a very big change to the low level protocol decoding that will touch everything. And while I agree that the protocol decoding probably should not require BytesIO (or some other stream), I am not excited about including that protocol change here. How about Int8.decode(BytesIO(msg[0])) ?

@dpkp
Copy link
Owner

dpkp commented Dec 21, 2017

I still haven't been able to get a functioning kerberos auth setup locally, but given that the current code is broken I would like to merge these changes as-is. The one change I'm going to make is to revert the protocol decoding changes in favor of a smaller change in the place we need it.

dpkp pushed a commit that referenced this pull request Dec 26, 2017
When trying to establish a connection with Kafka using SASL with the
GSSAPI authentication mechanism the connection was hanging an timing out
after 60 secons. On the Kafka broker side I noticed that the
SaslServerAuthenticator was going from the AUTHENTICATE to the FAILED state.

The GSSAPI auth implementation was missing the second handshake defined in
RFC 2222, which happens after the security context is established. This
handshake is used by the client and server to negotiate the security layer (QoP)
to be used for the connection.

Kafka currently only support the "auth" QoP, so the implementation in this commit
doesn't make it configurable, but this can be extended later.

With this change I was able to successfully connect to a Kerberos-enabled Kafka
broker using the SASL_PLAINTEXT protocol and the GSSAPI mechanism.
@dpkp
Copy link
Owner

dpkp commented Dec 26, 2017

Reverted protocol changes and merged as 4cfeaca

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants