Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Oauthbearer token refresh callback #410

Merged
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
fd865fd
configure oauthbearer_token_refresh_callback
bruce-szalwinski-he Feb 5, 2024
3349950
config specs
bruce-szalwinski-he Feb 5, 2024
7177f5e
config specs
bruce-szalwinski-he Feb 5, 2024
c596aa6
Merge branch 'main' into oauthbearer_token_refresh_callback
bruce-szalwinski-he Feb 7, 2024
c55d383
add set_token / set_token_failure bindings
bruce-szalwinski-he Feb 7, 2024
cd83676
add set_token / set_token_failure bindings
bruce-szalwinski-he Feb 7, 2024
05c02e4
Merge branch 'oauthbearer_token_refresh_callback' of https://github.c…
bruce-szalwinski-he Feb 7, 2024
0bf54f5
pass client pointer to callback
bruce-szalwinski-he Feb 7, 2024
e0bad42
Merge branch 'main' into oauthbearer_token_refresh_callback
mensfeld Feb 13, 2024
a4015fa
Merge branch 'main' into oauthbearer_token_refresh_callback
bruce-szalwinski-he Feb 13, 2024
ac2121d
adjust types for set token
bruce-szalwinski-he Feb 14, 2024
d889648
adjust types for set token
bruce-szalwinski-he Feb 14, 2024
8142a37
Merge branch 'oauthbearer_token_refresh_callback' of https://github.c…
bruce-szalwinski-he Feb 14, 2024
c84db70
back to string
bruce-szalwinski-he Feb 14, 2024
b315d0b
back to string
bruce-szalwinski-he Feb 14, 2024
1cad89a
back to pointer
bruce-szalwinski-he Feb 15, 2024
b1bd10e
int -> int64
bruce-szalwinski-he Feb 15, 2024
a133f6a
cleanup set token tests
bruce-szalwinski-he Feb 15, 2024
2276a7d
Merge branch 'main' into oauthbearer_token_refresh_callback
mensfeld Feb 29, 2024
387e5c9
expose oauthbearer_set_token on producer
bruce-szalwinski-he Feb 29, 2024
10df30d
Merge branch 'oauthbearer_token_refresh_callback' of https://github.c…
bruce-szalwinski-he Feb 29, 2024
a15d95d
oauthbearer_set_token specs
bruce-szalwinski-he Mar 1, 2024
f98687a
flatten extensions
bruce-szalwinski-he Mar 1, 2024
dcfa9ef
add set token ability to consumer and admin, refactor into helper
bruce-szalwinski-he Mar 1, 2024
5705fee
pass native-kafka as client, make helper a class
bruce-szalwinski-he Mar 1, 2024
b3b9d8b
expect type of client
bruce-szalwinski-he Mar 1, 2024
7ecc6e5
pass client-id in oauth callback
bruce-szalwinski-he Mar 1, 2024
947eb2c
refactor helper include
bruce-szalwinski-he Mar 1, 2024
172e349
comment out specs for oauthbearer_set_token when sasl is configured
bruce-szalwinski-he Mar 1, 2024
0a6babf
remove client id for now
bruce-szalwinski-he Mar 2, 2024
c989728
pass client-name to oauth callback
bruce-szalwinski-he Mar 3, 2024
e51542c
make args match across tests
bruce-szalwinski-he Mar 3, 2024
c545a1c
refactor oauthbearer_set_token as a mixin
bruce-szalwinski-he Mar 4, 2024
2ef64c3
close and free
bruce-szalwinski-he Mar 9, 2024
90c3872
close instead of free
bruce-szalwinski-he Mar 10, 2024
0ef2350
add TestConsumer that closes and destroys when done
bruce-szalwinski-he Mar 10, 2024
3b7b04c
Merge branch 'main' into oauthbearer_token_refresh_callback
mensfeld Mar 12, 2024
8a093ed
names are hard
bruce-szalwinski-he Mar 12, 2024
86cba70
clarify callback scope
bruce-szalwinski-he Mar 12, 2024
61b5615
Merge branch 'oauthbearer_token_refresh_callback' of https://github.c…
bruce-szalwinski-he Mar 12, 2024
faec83b
Merge branch 'main' into oauthbearer_token_refresh_callback
mensfeld Mar 17, 2024
a8df2ab
docs on extension_size math
bruce-szalwinski-he Mar 19, 2024
ea5f7e5
Merge branch 'oauthbearer_token_refresh_callback' of https://github.c…
bruce-szalwinski-he Mar 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ ext/librdkafka.*
doc
coverage
vendor
.idea/
out/
1 change: 1 addition & 0 deletions lib/rdkafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

require "rdkafka/version"
require "rdkafka/helpers/time"
require "rdkafka/helpers/oauth"
require "rdkafka/abstract_handle"
require "rdkafka/admin"
require "rdkafka/admin/create_topic_handle"
Expand Down
2 changes: 2 additions & 0 deletions lib/rdkafka/admin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

module Rdkafka
class Admin
include Helpers::OAuth

# @private
def initialize(native_kafka)
@native_kafka = native_kafka
Expand Down
32 changes: 31 additions & 1 deletion lib/rdkafka/bindings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def self.lib_extension

RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS = -175
RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS = -174
RD_KAFKA_RESP_ERR__STATE = -172
RD_KAFKA_RESP_ERR__NOENT = -156
RD_KAFKA_RESP_ERR_NO_ERROR = 0

Expand Down Expand Up @@ -111,7 +112,10 @@ class TopicPartitionList < FFI::Struct
callback :error_cb, [:pointer, :int, :string, :pointer], :void
attach_function :rd_kafka_conf_set_error_cb, [:pointer, :error_cb], :void
attach_function :rd_kafka_rebalance_protocol, [:pointer], :string

callback :oauthbearer_token_refresh_cb, [:pointer, :string, :pointer], :void
attach_function :rd_kafka_conf_set_oauthbearer_token_refresh_cb, [:pointer, :oauthbearer_token_refresh_cb], :void
attach_function :rd_kafka_oauthbearer_set_token, [:pointer, :string, :int64, :pointer, :pointer, :int, :pointer, :int], :int
attach_function :rd_kafka_oauthbearer_set_token_failure, [:pointer, :string], :int
# Log queue
attach_function :rd_kafka_set_log_queue, [:pointer, :pointer], :void
attach_function :rd_kafka_queue_get_main, [:pointer], :pointer
Expand Down Expand Up @@ -161,6 +165,32 @@ class TopicPartitionList < FFI::Struct
end
end

# The OAuth callback is currently global and contextless.
# This means that the callback will be called for all instances, and the callback must be able to determine to which instance it is associated.
# The instance name will be provided in the callback, allowing the callback to reference the correct instance.
#
# An example of how to use the instance name in the callback is given below.
# The `refresh_token` is configured as the `oauthbearer_token_refresh_callback`.
# `instances` is a map of client names to client instances, maintained by the user.
#
# ```
# def refresh_token(config, client_name)
# client = instances[client_name]
# client.oauthbearer_set_token(
# token: 'new-token-value',
# lifetime_ms: token-lifetime-ms,
# principal_name: 'principal-name'
# )
# end
# ```
OAuthbearerTokenRefreshCallback = FFI::Function.new(
bruce-szalwinski-he marked this conversation as resolved.
Show resolved Hide resolved
:void, [:pointer, :string, :pointer]
) do |client_ptr, config, _opaque|
if Rdkafka::Config.oauthbearer_token_refresh_callback
Rdkafka::Config.oauthbearer_token_refresh_callback.call(config, Rdkafka::Bindings.rd_kafka_name(client_ptr))
end
end

# Handle

enum :kafka_type, [
Expand Down
24 changes: 23 additions & 1 deletion lib/rdkafka/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ class Config
@@opaques = ObjectSpace::WeakMap.new
# @private
@@log_queue = Queue.new
# @private
# We memoize thread on the first log flush
# This allows us also to restart logger thread on forks
@@log_thread = nil
# @private
@@log_mutex = Mutex.new
# @private
@@oauthbearer_token_refresh_callback = nil

# Returns the current logger, by default this is a logger to stdout.
#
Expand Down Expand Up @@ -104,6 +105,24 @@ def self.error_callback
@@error_callback
end

# Sets the SASL/OAUTHBEARER token refresh callback.
# This callback will be triggered when it is time to refresh the client's OAUTHBEARER token
#
# @param callback [Proc, #call] The callback
#
# @return [nil]
def self.oauthbearer_token_refresh_callback=(callback)
raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) || callback == nil
@@oauthbearer_token_refresh_callback = callback
end

# Returns the current oauthbearer_token_refresh_callback callback, by default this is nil.
#
# @return [Proc, nil]
def self.oauthbearer_token_refresh_callback
@@oauthbearer_token_refresh_callback
end

# @private
def self.opaques
@@opaques
Expand Down Expand Up @@ -300,6 +319,9 @@ def native_config(opaque = nil)

# Set error callback
Rdkafka::Bindings.rd_kafka_conf_set_error_cb(config, Rdkafka::Bindings::ErrorCallback)

# Set oauth callback
Rdkafka::Bindings.rd_kafka_conf_set_oauthbearer_token_refresh_cb(config, Rdkafka::Bindings::OAuthbearerTokenRefreshCallback)
end
end

Expand Down
1 change: 1 addition & 0 deletions lib/rdkafka/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ module Rdkafka
class Consumer
include Enumerable
include Helpers::Time
include Helpers::OAuth

# @private
def initialize(native_kafka)
Expand Down
45 changes: 45 additions & 0 deletions lib/rdkafka/helpers/oauth.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
module Rdkafka
module Helpers

module OAuth

# Set the OAuthBearer token
bruce-szalwinski-he marked this conversation as resolved.
Show resolved Hide resolved
#
# @param token [String] the mandatory token value to set, often (but not necessarily) a JWS compact serialization as per https://tools.ietf.org/html/rfc7515#section-3.1.
# @param lifetime_ms [Integer] when the token expires, in terms of the number of milliseconds since the epoch. See https://currentmillis.com/.
# @param principal_name [String] the mandatory Kafka principal name associated with the token.
# @param extensions [Hash] optional SASL extensions key-value pairs to be communicated to the broker as additional key-value pairs during the initial client response as per https://tools.ietf.org/html/rfc7628#section-3.1.
# @return [Integer] 0 on success
def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions: nil)
error_buffer = FFI::MemoryPointer.from_string(" " * 256)
@native_kafka.with_inner do |inner|
response = Rdkafka::Bindings.rd_kafka_oauthbearer_set_token(
inner, token, lifetime_ms, principal_name,
flatten_extensions(extensions), extension_size(extensions), error_buffer, 256
)
if response != 0
Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure(
inner,
"Failed to set token: #{error_buffer.read_string}"
)
end

response
end
end

private

# Flatten the extensions hash into a string according to the spec, https://datatracker.ietf.org/doc/html/rfc7628#section-3.1
def flatten_extensions(extensions)
return nil unless extensions
"\x01#{extensions.map { |e| e.join("=") }.join("\x01")}"
end

def extension_size(extensions)
return 0 unless extensions
extensions.size * 2
mensfeld marked this conversation as resolved.
Show resolved Hide resolved
end
end
end
end
1 change: 1 addition & 0 deletions lib/rdkafka/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module Rdkafka
# A producer for Kafka messages. To create a producer set up a {Config} and call {Config#producer producer} on that.
class Producer
include Helpers::Time
include Helpers::OAuth

# Cache partitions count for 30 seconds
PARTITIONS_COUNT_TTL = 30
Expand Down
37 changes: 37 additions & 0 deletions spec/rdkafka/admin_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -404,4 +404,41 @@
end
end
end

describe '#oauthbearer_set_token' do
context 'when sasl not configured' do
it 'should return RD_KAFKA_RESP_ERR__STATE' do
response = admin.oauthbearer_set_token(
token: "foo",
lifetime_ms: Time.now.to_i*1000 + 900 * 1000,
principal_name: "kafka-cluster"
)
expect(response).to eq(Rdkafka::Bindings::RD_KAFKA_RESP_ERR__STATE)
end
end

context 'when sasl configured' do
before do
config_sasl = rdkafka_config(
"security.protocol": "sasl_ssl",
"sasl.mechanisms": 'OAUTHBEARER'
)
$admin_sasl = config_sasl.admin
end

after do
$admin_sasl.close
end

it 'should succeed' do

response = $admin_sasl.oauthbearer_set_token(
token: "foo",
lifetime_ms: Time.now.to_i*1000 + 900 * 1000,
principal_name: "kafka-cluster"
)
expect(response).to eq(0)
end
end
end
end
82 changes: 82 additions & 0 deletions spec/rdkafka/bindings_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,86 @@
end
end
end

describe "oauthbearer set token" do

context "without args" do
it "should raise argument error" do
expect {
Rdkafka::Bindings.rd_kafka_oauthbearer_set_token
}.to raise_error(ArgumentError)
end
end

context "with args" do
before do
DEFAULT_TOKEN_EXPIRY_SECONDS = 900
$token_value = "token"
$md_lifetime_ms = Time.now.to_i*1000 + DEFAULT_TOKEN_EXPIRY_SECONDS * 1000
$md_principal_name = "kafka-cluster"
$extensions = nil
$extension_size = 0
$error_buffer = FFI::MemoryPointer.from_string(" " * 256)
end

it "should set token or capture failure" do
RdKafkaTestConsumer.with do |consumer_ptr|
response = Rdkafka::Bindings.rd_kafka_oauthbearer_set_token(consumer_ptr, $token_value, $md_lifetime_ms, $md_principal_name, $extensions, $extension_size, $error_buffer, 256)
expect(response).to eq(Rdkafka::Bindings::RD_KAFKA_RESP_ERR__STATE)
expect($error_buffer.read_string).to eq("SASL/OAUTHBEARER is not the configured authentication mechanism")
end
end
end
end

describe "oauthbearer set token failure" do

context "without args" do

it "should fail" do
expect {
Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure
}.to raise_error(ArgumentError)
end
end

context "with args" do
it "should succeed" do
expect {
errstr = "error"
RdKafkaTestConsumer.with do |consumer_ptr|
Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure(consumer_ptr, errstr)
end
}.to_not raise_error
end
end
end

describe "oauthbearer callback" do

context "without an oauthbearer callback" do
it "should do nothing" do
expect {
Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call(nil, "", nil)
}.not_to raise_error
end
end

context "with an oauthbearer callback" do
before do
Rdkafka::Config.oauthbearer_token_refresh_callback = lambda do |config, client_name|
$received_config = config
$received_client_name = client_name
end
end

it "should call the oauth bearer callback and receive config and client name" do
RdKafkaTestConsumer.with do |consumer_ptr|
Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call(consumer_ptr, "{}", nil)
expect($received_config).to eq("{}")
expect($received_client_name).to match(/consumer/)
end
end
end
end
end
33 changes: 33 additions & 0 deletions spec/rdkafka/config_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,39 @@ def call(stats); end
end
end

context "oauthbearer calllback" do
context "with a proc/lambda" do
it "should set the callback" do
expect {
Rdkafka::Config.oauthbearer_token_refresh_callback = lambda do |config, client_name|
puts config
puts client_name
end
}.not_to raise_error
expect(Rdkafka::Config.oauthbearer_token_refresh_callback).to respond_to :call
end
end

context "with a callable object" do
it "should set the callback" do
callback = Class.new do
def call(config, client_name); end
end

expect {
Rdkafka::Config.oauthbearer_token_refresh_callback = callback.new
}.not_to raise_error
expect(Rdkafka::Config.oauthbearer_token_refresh_callback).to respond_to :call
end
end

it "should not accept a callback that's not callable" do
expect {
Rdkafka::Config.oauthbearer_token_refresh_callback = 'not a callback'
}.to raise_error(TypeError)
end
end

context "configuration" do
it "should store configuration" do
config = Rdkafka::Config.new
Expand Down
36 changes: 36 additions & 0 deletions spec/rdkafka/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1301,4 +1301,40 @@ def collect(name, list)
])
end
end

describe '#oauthbearer_set_token' do
context 'when sasl not configured' do
it 'should return RD_KAFKA_RESP_ERR__STATE' do
response = consumer.oauthbearer_set_token(
token: "foo",
lifetime_ms: Time.now.to_i*1000 + 900 * 1000,
principal_name: "kafka-cluster"
)
expect(response).to eq(Rdkafka::Bindings::RD_KAFKA_RESP_ERR__STATE)
end
end

context 'when sasl configured' do
before do
$consumer_sasl = rdkafka_producer_config(
"security.protocol": "sasl_ssl",
"sasl.mechanisms": 'OAUTHBEARER'
).consumer
end

after do
$consumer_sasl.close
end

it 'should succeed' do

response = $consumer_sasl.oauthbearer_set_token(
token: "foo",
lifetime_ms: Time.now.to_i*1000 + 900 * 1000,
principal_name: "kafka-cluster"
)
expect(response).to eq(0)
end
end
end
end
Loading