diff --git a/tutorials/oauth/python/config_utils.py b/tutorials/oauth/python/config_utils.py new file mode 100644 index 0000000..3313a71 --- /dev/null +++ b/tutorials/oauth/python/config_utils.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# Copyright 2023 Azure Inc. +# Licensed under the MIT License. +# Licensed under the Apache License, Version 2.0 +# + +from azure.identity import DefaultAzureCredential +from functools import partial +import os +import requests +import time + + +TENANT_ID = os.environ.get('AZURE_TENANT_ID') +CLIENT_ID = os.environ.get('AZURE_CLIENT_ID') +CLIENT_SECRET = os.environ.get('AZURE_CLIENT_SECRET') + + +def get_oauth_config(namespace): + conf = { + 'bootstrap.servers': '%s:9093' % namespace, + + # Required OAuth2 configuration properties + 'security.protocol': 'SASL_SSL', + 'sasl.mechanism': 'OAUTHBEARER' + } + return conf + + +def get_azure_config(namespace): + def oauth_cb(cred, namespace_fqdn, config): + # confluent_kafka requires an oauth callback function to return (str, float) with the values of (, ) + + # cred: an Azure identity library credential object. Ex: an instance of DefaultAzureCredential, ManagedIdentityCredential, etc + # namespace_fqdn: the FQDN for the target Event Hubs namespace. Ex: 'mynamespace.servicebus.windows.net' + # config: confluent_kafka passes the value of 'sasl.oauthbearer.config' as the config param + + access_token = cred.get_token('https://%s/.default' % namespace_fqdn) + return access_token.token, access_token.expires_on + + # Azure credential + # See https://docs.microsoft.com/en-us/azure/developer/python/sdk/authentication-overview + cred = DefaultAzureCredential() + + # Producer configuration + # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md + conf = get_oauth_config(namespace) + + # the resulting oauth_cb must accept a single `config` parameter, so we use partial to bind the namespace/identity to our function + conf['oauth_cb'] = partial(oauth_cb, cred, namespace) + return conf + + +# Using Kafka oauthbearer OIDC semantics that decodes JWT tokens and uses exp claim +# KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC +# +def get_oidc_config(namespace): + conf = get_oauth_config(namespace) + conf.update({ + 'sasl.oauthbearer.method': 'oidc', + 'sasl.oauthbearer.client.id': CLIENT_ID, + 'sasl.oauthbearer.client.secret': CLIENT_SECRET, + 'sasl.oauthbearer.token.endpoint.url': 'https://login.microsoftonline.com/%s/oauth2/v2.0/token' % TENANT_ID, + 'sasl.oauthbearer.scope': 'https://%s/.default' % namespace, + }) + + return conf + + +# Using expires_in field from the token response to treat OAUTHBEARER as opaque +# and avoid decoding JWT by utilizing RFC9068, RFC6749 concepts. +# https://datatracker.ietf.org/doc/html/rfc9068#name-privacy-considerations +# https://www.rfc-editor.org/rfc/rfc6749#section-4.2.2 +# +def get_opaque_config(namespace): + def oauth_cb(config): + # take the time before request first + token_exp_time = int(time.time()) + + token_resp = requests.post( + "https://login.microsoftonline.com/%s/oauth2/v2.0/token" % TENANT_ID, + auth=(CLIENT_ID, CLIENT_SECRET), + data={ + 'grant_type': 'client_credentials', + 'scope': 'https://%s/.default' % namespace + } + ) + + token_resp = token_resp.json() + + # add expires_in value which is a token validity time + # in seconds from the time the response was generated + # + token_exp_time += int(token_resp['expires_in']) + + return token_resp['access_token'], token_exp_time + + conf = get_oauth_config(namespace) + conf['oauth_cb'] = oauth_cb + + return conf + + + +# Returns producer configs for azure, opaque, oidc modes +# +def get_config(namespace, mode): + if mode == 'azure': + conf = get_azure_config(namespace) + elif mode == 'oidc': + conf = get_oidc_config(namespace) + elif mode == 'opaque': + conf = get_opaque_config(namespace) + + return conf \ No newline at end of file diff --git a/tutorials/oauth/python/consumer.py b/tutorials/oauth/python/consumer.py index 829894b..fac5227 100644 --- a/tutorials/oauth/python/consumer.py +++ b/tutorials/oauth/python/consumer.py @@ -1,94 +1,22 @@ #!/usr/bin/env python # # Copyright (c) Microsoft Corporation. All rights reserved. -# Copyright 2016 Confluent Inc. +# Copyright 2023 Confluent Inc. # Licensed under the MIT License. # Licensed under the Apache License, Version 2.0 # # Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka Ecosystems -from azure.identity import DefaultAzureCredential from confluent_kafka import Consumer, KafkaException import sys -import getopt +import argparse import json import logging -from functools import partial from pprint import pformat +import config_utils -def stats_cb(stats_json_str): - stats_json = json.loads(stats_json_str) - print('\nKAFKA Stats: {}\n'.format(pformat(stats_json))) - - -def oauth_cb(cred, namespace_fqdn, config): - # confluent_kafka requires an oauth callback function to return (str, float) with the values of (, ) - - # cred: an Azure identity library credential object. Ex: an instance of DefaultAzureCredential, ManagedIdentityCredential, etc - # namespace_fqdn: the FQDN for the target Event Hubs namespace. Ex: 'mynamespace.servicebus.windows.net' - # config: confluent_kafka passes the value of 'sasl.oauthbearer.config' as the config param - - access_token = cred.get_token('https://%s/.default' % namespace_fqdn) - return access_token.token, access_token.expires_on - - -def print_usage_and_exit(program_name): - sys.stderr.write( - 'Usage: %s [options..] ..\n' % program_name) - options = ''' - Options: - -T Enable client statistics at specified interval (ms) -''' - sys.stderr.write(options) - sys.exit(1) - - -if __name__ == '__main__': - optlist, argv = getopt.getopt(sys.argv[1:], 'T:') - if len(argv) < 3: - print_usage_and_exit(sys.argv[0]) - - namespace = argv[0] - group = argv[1] - topics = argv[2:] - - # Azure credential - # See https://docs.microsoft.com/en-us/azure/developer/python/sdk/authentication-overview - cred = DefaultAzureCredential() - - # Consumer configuration - # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md - conf = { - 'bootstrap.servers': '%s:9093' % namespace, - 'group.id': group, - 'session.timeout.ms': 6000, - 'auto.offset.reset': 'earliest', - - # Required OAuth2 configuration properties - 'security.protocol': 'SASL_SSL', - 'sasl.mechanism': 'OAUTHBEARER', - # the resulting oauth_cb must accept a single `config` parameter, so we use partial to bind the namespace/identity to our function - 'oauth_cb': partial(oauth_cb, cred, namespace), - } - - # Check to see if -T option exists - for opt in optlist: - if opt[0] != '-T': - continue - try: - intval = int(opt[1]) - except ValueError: - sys.stderr.write("Invalid option value for -T: %s\n" % opt[1]) - sys.exit(1) - - if intval <= 0: - sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1]) - sys.exit(1) - - conf['stats_cb'] = stats_cb - conf['statistics.interval.ms'] = int(opt[1]) - +def consume_workload(conf, topics): # Create logger for consumer (logs will be emitted when poll() is called) logger = logging.getLogger('consumer') logger.setLevel(logging.DEBUG) @@ -127,3 +55,41 @@ def print_assignment(consumer, partitions): finally: # Close down consumer to commit final offsets. c.close() + + +def parse_consumer_args(): + parser = argparse.ArgumentParser(description='Process command line arguments.') + parser.add_argument('namespace', help='Eventhubs namespace') + parser.add_argument('group', help='Group') + parser.add_argument('topics', nargs='+', help='Topic1, Topic2, ...') + parser.add_argument('-T', type=int, help='Enable client statistics at specified interval (ms)') + parser.add_argument('--mode', default='azure', choices=['azure', 'oidc', 'opaque'], help='Optional confluent producer configuration mode - azure, oidc, opaque') + + args = parser.parse_args() + + if args.T and args.T <= 0: + sys.stderr.write("-T option value needs to be larger than zero: %s\n" % args.T) + sys.exit(1) + + return args.namespace, args.group, args.topics, args.T, args.mode + + +if __name__ == '__main__': + namespace, group, topics, T, mode = parse_consumer_args() + + conf = config_utils.get_config(namespace, mode) + conf.update({ + 'group.id': group, + 'session.timeout.ms': 6000, + 'auto.offset.reset': 'earliest' + }) + + def stats_cb(stats_json_str): + stats_json = json.loads(stats_json_str) + print('\nKAFKA Stats: {}\n'.format(pformat(stats_json))) + + if T: + conf['stats_cb'] = stats_cb + conf['statistics.interval.ms'] = T + + consume_workload(conf, topics) diff --git a/tutorials/oauth/python/producer.py b/tutorials/oauth/python/producer.py index f357e12..125cd9f 100644 --- a/tutorials/oauth/python/producer.py +++ b/tutorials/oauth/python/producer.py @@ -1,56 +1,23 @@ #!/usr/bin/env python # # Copyright (c) Microsoft Corporation. All rights reserved. -# Copyright 2016 Confluent Inc. +# Copyright 2023 Confluent Inc. # Licensed under the MIT License. # Licensed under the Apache License, Version 2.0 # # Original Confluent sample modified for use with Azure Event Hubs for Apache Kafka Ecosystems -from azure.identity import DefaultAzureCredential from confluent_kafka import Producer import sys -from functools import partial +import argparse +import config_utils -def oauth_cb(cred, namespace_fqdn, config): - # confluent_kafka requires an oauth callback function to return (str, float) with the values of (, ) - - # cred: an Azure identity library credential object. Ex: an instance of DefaultAzureCredential, ManagedIdentityCredential, etc - # namespace_fqdn: the FQDN for the target Event Hubs namespace. Ex: 'mynamespace.servicebus.windows.net' - # config: confluent_kafka passes the value of 'sasl.oauthbearer.config' as the config param - - access_token = cred.get_token('https://%s/.default' % namespace_fqdn) - return access_token.token, access_token.expires_on - - -if __name__ == '__main__': - if len(sys.argv) != 3: - sys.stderr.write('Usage: %s \n' % sys.argv[0]) - sys.exit(1) - - namespace = sys.argv[1] - topic = sys.argv[2] - - # Azure credential - # See https://docs.microsoft.com/en-us/azure/developer/python/sdk/authentication-overview - cred = DefaultAzureCredential() - - # Producer configuration - # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md - conf = { - 'bootstrap.servers': '%s:9093' % namespace, - - # Required OAuth2 configuration properties - 'security.protocol': 'SASL_SSL', - 'sasl.mechanism': 'OAUTHBEARER', - # the resulting oauth_cb must accept a single `config` parameter, so we use partial to bind the namespace/identity to our function - 'oauth_cb': partial(oauth_cb, cred, namespace), - } - +def produce_workload(conf, topic, num_messages): # Create Producer instance p = Producer(**conf) + # Optional per-message delivery callback (triggered by poll() or flush()) # when a message has been successfully delivered or permanently # failed delivery (after retries). @@ -59,10 +26,10 @@ def delivery_callback(err, msg): sys.stderr.write('%% Message failed delivery: %s\n' % err) else: sys.stderr.write('%% Message delivered to %s [%d] @ %d\n' % - (msg.topic(), msg.partition(), msg.offset())) + (msg.topic(), msg.partition(), msg.offset())) - # Write 1-100 to topic - for i in range(0, 100): + # Write 1-records_num to topic + for i in range(num_messages): try: p.produce(topic, str(i), callback=delivery_callback) except BufferError: @@ -78,3 +45,25 @@ def delivery_callback(err, msg): # Wait until all messages have been delivered sys.stderr.write('%% Waiting for %d deliveries\n' % len(p)) p.flush() + + +def parse_producer_args(): + parser = argparse.ArgumentParser(description='Process command line arguments.') + parser.add_argument('namespace', help='Eventhubs namespace') + parser.add_argument('topic', help='Topic or Event Hub') + parser.add_argument('--mode', default='azure', + choices=['azure', 'oidc', 'opaque'], + help='Optional confluent producer configuration mode - azure, oidc, opaque') + parser.add_argument('--num-messages', type=int, default=100, + help='Optional number of messages to be produced') + + args = parser.parse_args() + return args.namespace, args.topic, args.mode, args.num_messages + + +if __name__ == '__main__': + namespace, topic, mode, num_messages = parse_producer_args() + + conf = config_utils.get_config(namespace, mode) + + produce_workload(conf, topic, num_messages=num_messages) \ No newline at end of file