Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka OAUTHBEARER: Could not produce message to multiple Event hub namespaces in one application #200

Open
hoanthien8988 opened this issue Jun 15, 2022 · 9 comments

Comments

@hoanthien8988
Copy link

Description

Kafka OAUTHBEARER: Could not produce message to multiple Event hub namespaces in one application

How to reproduce

Hi experts,

I am using this sample: https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/tutorials/oauth/java/appsecret

I made some minor modifications in TestProducer class: add line 18:
executorService.execute(new TestDataReporter(producer_auto, TOPIC));
and line 26:
properties.load(new FileReader("src/main/resources/producer_auto.config"));

Purpose: I want to produce message to 2 different EVENT HUB namespaces (means creating 2 diffrent Kafka Producers for 2 bootstrap servers) in ONE console application, see full code:

public class TestProducer {
     //Change constant to send messages to the desired topic, for this example we use 'test'
     private final static String TOPIC = "do.kafka.oauth";
    
     private final static int NUM_THREADS = 1;
    
    
     public static void main(String... args) throws Exception {
         //Create Kafka Producer
         final Producer<Long, String> producer = createProducer(false);
         final Producer<Long, String> producer_auto = createProducer(true);
    
         final ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS);
    
         //Run NUM_THREADS TestDataReporters
         for (int i = 0; i < NUM_THREADS; i++){
             executorService.execute(new TestDataReporter(producer, TOPIC));
             executorService.execute(new TestDataReporter(producer_auto, TOPIC));
         }
     }
    
     private static Producer<Long, String> createProducer(boolean isAuto) {
         try{
             Properties properties = new Properties();
             if(isAuto)
                 properties.load(new FileReader("src/main/resources/producer_auto.config"));
             else
                 properties.load(new FileReader("src/main/resources/producer.config"));
             properties.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer");
             properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
             properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
             return new KafkaProducer<>(properties);
         } catch (Exception e){
             System.out.println("Failed to create producer with exception: " + e);
             System.exit(0);
             return null; //unreachable
         }
     }
 }

Here is producer.config:

bootstrap.servers=advantcoeventhubs.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
sasl.login.callback.handler.class=CustomAuthenticateCallbackHandler

and producer_auto.config:

bootstrap.servers=autoeventhubtesting.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
sasl.login.callback.handler.class=CustomAuthenticateCallbackHandler

When I execute the code, the application can produce message to just the first namespace (advantcoeventhubs), and throw exception when produce message to the second namespace (autoeventhubtesting): "ERROR NetworkClient [Producer clientId=KafkaExampleProducer] Connection to node -1 (autoeventhubtesting.servicebus.windows.net/13.66.138.74:9093) failed authentication due to: Invalid SASL mechanism response, server may be expecting a different protocol"

Please see attached picture for error here: Invalid SASL mechanism response, server may be expecting a different protocol

All event hub namespaces are in Standard tier already. Can any experts advise the root cause and work around solution?
Thank you so much!

Has it worked previously?

It was working successfully when we used 1 separated application to produce message to 1 event hub namespace.
The issue only happens when we use ONE application to produce message to 2 different event hub namespaces.

@serkantkaraca
Copy link
Member

Can you run the original sample for both namespaces separately? This way you can identify if there is any RBAC issues.

@hoanthien8988
Copy link
Author

Can you run the original sample for both namespaces separately? This way you can identify if there is any RBAC issues.

Hi @serkantkaraca ,

Yes, I did as you said and mentioned it in the original post:

Has it worked previously?
It was working successfully when we used 1 separated application to produce message to 1 event hub namespace.

@serkantkaraca
Copy link
Member

I will try to reproduce this and update the thread with my findings.

@serkantkaraca
Copy link
Member

Issue seems to be related to Executors. Can you remove the executor and move the produce code into a static function?

@hoanthien8988
Copy link
Author

Hi @serkantkaraca ,

In the real business, I would think one application should be capable of producing/consuming messages to/from multiple namespaces in parallel, not sequentially.
By the way, I tried removing the Executors + producing one by one but still got the same error. Here is the main():

public static void main(String... args) throws Exception {
       //Create Kafka Producer
       final Producer<Long, String> producer = createProducer(false);
       final Producer<Long, String> producer_auto = createProducer(true);

       System.out.println(" Test Data #");
       final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, null, "Test Data #");
       producer.send(record, new Callback() {
           public void onCompletion(RecordMetadata metadata, Exception exception) {
               if (exception != null) {
                   System.out.println(exception);
                   System.exit(1);
               }
           }
       });

       System.out.println(" Test Data #auto");
       final ProducerRecord<Long, String> recordauto = new ProducerRecord<Long, String>(TOPIC, null, "Test Data #auto");
       producer_auto.send(record, new Callback() {
           public void onCompletion(RecordMetadata metadata, Exception exception) {
               if (exception != null) {
                   System.out.println(exception);
                   System.exit(1);
               }
           }
       });
   }

Error here:

"C:\Program Files\Java\jdk1.8.0_221\bin\java.exe"...
TestProducer
ERROR StatusLogger Log4j2 could not find a logging implementation. Please add log4j-core to the classpath. Using SimpleLogger to log to the console...
TOKEN ACQUIRED
 Test Data #
ERROR NetworkClient [Producer clientId=KafkaExampleProducer] Connection to node -1 (autoeventhubtesting.servicebus.windows.net/40.78.250.67:9093) failed authentication due to: Invalid SASL mechanism response, server may be expecting a different protocol
 Test Data #auto
org.apache.kafka.common.errors.IllegalSaslStateException: Invalid SASL mechanism response, server may be expecting a different protocol

Process finished with exit code 1

I guess the CustomAuthenticateCallbackHandler can handle only 1 request from 1 namespace very slowly before being available again for the next request from another different namespace.

@serkantkaraca
Copy link
Member

Timing of the logs are not adding up. How is the failure for autoeventhubtesting making before "#auto" log line? Can you wait until the messages are flushed?

@hoanthien8988
Copy link
Author

hoanthien8988 commented Jun 22, 2022

Hi @serkantkaraca ,

You can reproduce the issue easily by using my main() in the previous comment, to debug where the failure's from. It seems the "producer_auto" could not be created right after creating "producer" successfully --> caused the exception before "#auto" log line.

By the way, if you already had a sample to consume/produce messages from/to multiple namespaces in ONE application successfullly, could you please share it?

Thank you!

@serkantkaraca
Copy link
Member

I was able to identify the issue. Kafka library doesn't allow creating multiple login managers when the callback handler is shared across different namespaces. I was able to solve the issue by creating 2 different handlers like CustomAuthenticateCallbackHandler1 and CustomAuthenticateCallbackHandler2.

Below is the Kafka code that causes the issue. Once it creates the first loginManager for the first broker, it skips for the second broker.

https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java#L104

@hoanthien8988
Copy link
Author

Hi @serkantkaraca ,

Thanks for your analysis!

I was able to solve the issue by creating 2 different handlers like CustomAuthenticateCallbackHandler1 and CustomAuthenticateCallbackHandler2

Could you provide a generic solution for that? A sample code would be greatly appreciated!
Because in real business, we have many dynamic namespaces that need to be produce/consume message in one application in parallel, and we cannot edit the code to add CustomAuthenticateCallbackHandler1, 2, 3, ...
If so, can you upload your solution somewhere (Github is ok) for our reference?

Thanks for your support!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants