Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Oauthbearer token refresh callback #410

Merged

Conversation

bruce-szalwinski-he
Copy link
Contributor

Took a stab at adding support for the oauth bearer token refresh callback. For a system test, I built a small ruby script, put it in a container, ran the container in ECS. I can see that the callback is being called. It is not clear what actions the callback is supposed to perform. For python, the MskAuthTokenProvider.generate_auth_token function creates a base64 encoded signed url and the kafka-python library uses that to make an authentication request. Meaning that for python, using the OAUTHBEARER sasl mechanism looks like:

class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token('us-east-1')
        return token


tp = MSKTokenProvider()

producer = KafkaProducer(
    bootstrap_servers=list(get_settings().KAFKA_BOOTSTRAP_SERVERS.split(',')),
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id=socket.gethostname(),
)

# send a payload to a topic
producer.send(topic, payload)

For Ruby, so far this looks like:

    def self.start!(kafka_config)
      Rdkafka::Config.oauthbearer_token_refresh_callback = method(:token_callback)
      @producer = Rdkafka::Config.new(kafka_config).producer
    end

The token_callback currently creates a base64 urlencoded signed v4 url, but I can't tell that anything in rdkafka uses that to make an authentication request. Is the callback supposed to implement the authentication request as well?

Closes #406
Mentioned in: karafka/karafka#1767

@mensfeld mensfeld self-requested a review February 7, 2024 10:20
@mensfeld mensfeld added enhancement librdkafka Label for reports / issues related to C librdkafka labels Feb 7, 2024
@mensfeld
Copy link
Member

mensfeld commented Feb 7, 2024

This PR will trigger the callback but it lacks rd_kafka_oauthbearer_set_token and rd_kafka_oauthbearer_set_token_failure according to the flow.

ref https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.h#L2191

That said, nice start :)

@bruce-szalwinski-he
Copy link
Contributor Author

This PR will trigger the callback but it lacks rd_kafka_oauthbearer_set_token and rd_kafka_oauthbearer_set_token_failure according to the flow.

ref https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.h#L2191

Ok, added bindings for set_token, set_token_failure.

Checking for understanding:

  • the method referenced by oauthbearer_token_refresh_callback should create a token
  • if successful, it should call rd_kafka_oauthbearer_set_token
  • if unsuccessful, it should call rd_kafka_oauthbearer_set_token_failure
  • the oauthbearer_token_refresh_callback will be invoked prior to the token expiring, and the dance begins again.
  • something needs to call the oauthbearer_token_refresh_callback function the first time

@mensfeld
Copy link
Member

mensfeld commented Feb 8, 2024

You got it right.

something needs to call the oauthbearer_token_refresh_callback function the first time

This is already implemented in waterdrop and karafka, so no changes needed (aside from a high-level API but that I will do).

@bruce-szalwinski-he
Copy link
Contributor Author

@kdhfred I've added the Bindings and am working on testing out this feature against AWS MSK. I created some code to generate the tokens, modeling it after how Python is creating credentials. Haven't had any luck yet.

  def generate_token(region: "us-east-1")
    aws_credentials = load_credentials_from_ecs
    construct_auth_token(aws_credentials, region)
  end

  def load_credentials_from_ecs
    ecs_credentials = Aws::ECSCredentials.new(retries: 3)
    ecs_credentials.credentials
  end

  def construct_auth_token(aws_credentials, region)
    endpoint_url = ENDPOINT_URL_TEMPLATE.gsub("{}", region)
    query_params = {"Action": "kafka-cluster:Connect"}
    url = URI::HTTPS.build(host: endpoint_url, path: "/", query: URI.encode_www_form(query_params).to_s)

    signer = Aws::Sigv4::Signer.new(
      service: 'kafka-cluster',
      region: 'us-east-1',
      credentials_provider: aws_credentials
    )

    url = signer.presign_url(
      http_method: 'GET',
      url: url,
      ).to_s

    url_utf_8 = url.encode("UTF-8")
    puts "URL: #{url_utf_8}"
    Base64.urlsafe_encode64(url_utf_8)
  end

@mensfeld
Copy link
Member

@bruce-szalwinski-he do you need my assistance with this?

@bruce-szalwinski-he
Copy link
Contributor Author

@bruce-szalwinski-he do you need my assistance with this?

I've been trying to get a system test working. I need Ruby to create a valid pre-signed URL and I'm stuck. I've been able to create producers in both Python and in Go and they have no trouble creating pre-signed urls. I've fired up an EC2 box where I have a Ruby script that creates pre-signed URL and I feed it to the Python and Go programs and they both fail with some version of

client/metadata got error from broker -1 while fetching metadata: kafka server: SASL Authentication failed: [0bd12d32-5dd8-4602-83d9-b5214a579ea2]: Invalid authentication payload

I've had Go create a pre-signed URL and I feed it to Python and that works fine.

I posted the script and a question over at aws/aws-sdk-ruby#2985.

@bruce-szalwinski-he
Copy link
Contributor Author

Ok, got my pre-signed url working. Now to plug that logic into system test and see if rdkafka-ruby in ecs will publish to kafka.

@bruce-szalwinski-he
Copy link
Contributor Author

bruce-szalwinski-he commented Mar 2, 2024

I think I'm not understanding this part:

We can "just" keep it as it is with a small change, to make this API aligned in behaviour with other callbacks until "total redesign" (recommended by me). The small change would be just providing in the arguments to the callback the instance name (producer/consumer/admin) allowing users that would need to reference their rdkafka instances that way.

The arguments that are passed to the callbacks are defined by librdkafka, correct? I believe the oauth refresh callback is governed by https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.h#L2241, with is going to return the client, config, and opaque.

Adding another arg to the callback

    callback :oauthbearer_token_refresh_cb, [:pointer, :string, :pointer, :string], :void

    OAuthbearerTokenRefreshCallback = FFI::Function.new(
      :void, [:pointer, :string, :pointer, :string]
    ) do |_client_ptr, _config, _opaque, client_id|
      puts("OAuthbearerTokenRefreshCallback called")
      puts()
      puts("client_id: #{client_id}")
      if Rdkafka::Config.oauthbearer_token_refresh_callback
        Rdkafka::Config.oauthbearer_token_refresh_callback.call(client_id)
      end
    end

Results in crash reports.

So perhaps you meant something else?

@bruce-szalwinski-he
Copy link
Contributor Author

@mensfeld thanks for the chat today. I think this is ready now. I left you a few questions about docs.

lib/rdkafka/admin.rb Outdated Show resolved Hide resolved
lib/rdkafka/bindings.rb Show resolved Hide resolved
lib/rdkafka/helpers/oauth.rb Show resolved Hide resolved
lib/rdkafka/bindings.rb Outdated Show resolved Hide resolved
@bruce-szalwinski-he
Copy link
Contributor Author

I can't repro the failure for 3.1

rbenv local 3.1.4
DEBUG_CONSUMER=true bundle exec rspec
337 examples, 0 failures

DEBUG_PRODUCER=true bundle exec rspec
337 examples, 0 failures

@mensfeld
Copy link
Member

mensfeld commented Mar 9, 2024

@bruce-szalwinski-he this error is due to lack of correct shutdown of rdkafka. I will take a look. It's something introduced within this code.

Copy link
Member

@mensfeld mensfeld left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ptrs are not closed

spec/rdkafka/bindings_spec.rb Outdated Show resolved Hide resolved
lib/rdkafka/bindings.rb Outdated Show resolved Hide resolved
spec/spec_helper.rb Outdated Show resolved Hide resolved
@mensfeld mensfeld self-requested a review March 19, 2024 08:52
Copy link
Member

@mensfeld mensfeld left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last thing I do not understand: why extensions * 2

lib/rdkafka/helpers/oauth.rb Show resolved Hide resolved
@mensfeld mensfeld merged commit a8a94fd into karafka:main Mar 20, 2024
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement librdkafka Label for reports / issues related to C librdkafka
Development

Successfully merging this pull request may close these issues.

how to configure oauthbearer token refresh
2 participants