Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka consumers disconnect after a time #20

Closed
florian-besser opened this issue Jan 6, 2023 · 22 comments
Closed

Kafka consumers disconnect after a time #20

florian-besser opened this issue Jan 6, 2023 · 22 comments
Labels
wontfix This will not be worked on

Comments

@florian-besser
Copy link

florian-besser commented Jan 6, 2023

We are using the new MSK IAM mechanism, and when we start our app it connects correctly and consumes messages successfully.

After a time it prints the following log message and disconnects from the consumer group:

Jan 5, 2023 @ 20:01:50.782
NS: Consumer, label: INFO, message: Stopped
Jan 5, 2023 @ 20:01:50.714
NS: Consumer, label: ERROR, message: Crash: KafkaJSNonRetriableError: Cannot change principals during re-authentication from IAM.arn:aws:sts::822<censored>:assumed-role/<censored>_backend/aws-sdk-js-session-1672916517755: IAM.arn:aws:sts::822<censored>:assumed-role/<censored>_backend/aws-sdk-js-session-1672920109713
Jan 5, 2023 @ 20:01:50.672
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: Authentication response
Jan 5, 2023 @ 20:01:50.672
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: SASL Simon authentication successful
Jan 5, 2023 @ 20:01:50.456
NS: SaslAuthenticator-AWS_MSK_IAM, label: ERROR, message: Cannot change principals during re-authentication from IAM.arn:aws:sts::822<censored>:assumed-role/<censored>_backend/aws-sdk-js-session-1672916517755: IAM.arn:aws:sts::822<censored>:assumed-role/<censored>_backend/aws-sdk-js-session-1672920109713
Jan 5, 2023 @ 19:42:11.351
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: Authentication response
Jan 5, 2023 @ 19:42:11.351
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: SASL Simon authentication successful

I figure this is due to some permissions issue. Our app runs in a K8s environment and uses a serviceaccount. This is the policy to assume roles:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Federated": "arn:aws:iam::822<censored>:oidc-provider/oidc.eks.ap-southeast-1.amazonaws.com/id/0171B605AE128C1216D47DF764678130"
            },
            "Action": "sts:AssumeRoleWithWebIdentity",
            "Condition": {
                "StringEquals": {
                    "oidc.eks.ap-southeast-1.amazonaws.com/id/0171B605AE128C1216D47DF764678130:aud": "sts.amazonaws.com",
                    "oidc.eks.ap-southeast-1.amazonaws.com/id/0171B605AE128C1216D47DF764678130:sub": "system:serviceaccount:<censored>:backend"
                }
            }
        }
    ]
}

The actual role then has the following permissions:

{
    "Statement": [
        {
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:DescribeCluster",
                "kafka-cluster:DescribeClusterDynamicConfiguration",
                "kafka-cluster:WriteDataIdempotently"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:kafka:ap-southeast-1:822<censored>:cluster/kafka/7abf5f68-a330-40e0-8f99-bada163d0e84-5"
            ]
        },
        {
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:WriteData",
                "kafka-cluster:ReadData"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:kafka:ap-southeast-1:822<censored>:topic/kafka/*/<censored>.*"
            ]
        },
        {
            "Action": [
                "kafka-cluster:*Group*"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:kafka:ap-southeast-1:822<censored>:group/kafka/*/<censored>.*"
            ]
        }
    ],
    "Version": "2012-10-17"
}

As the app correctly can consume messages on startup I assume the permissions are OK at first. Are there additional permissions needed for this re-authentication flow? If so do you have some documentation showing what is needed?

Lastly, this is our client code:

{
      clientId: `${process.env.TENANT}.backend-producer`,
      brokers: process.env.KAFKA_BROKERS.split(','),
      sasl: {
        mechanism: Type,
        authenticationProvider: awsIamAuthenticator(
          process.env.AWS_REGION,
          '900',
        ),
      },
      ssl: true,
    };
@florian-besser
Copy link
Author

I decided to reduce the TTL to 10 minutes instead and see what happens. Here are our logs:

Jan 6, 2023 @ 12:20:45.634
NS: Consumer, label: INFO, message: Stopped
Jan 6, 2023 @ 12:20:45.558
NS: Consumer, label: ERROR, message: Crash: KafkaJSNonRetriableError: Cannot change principals during re-authentication from IAM.arn:aws:sts::<censored>:assumed-role/<censored>_backend/aws-sdk-js-session-1672975250038: IAM.arn:aws:sts::<censored>:assumed-role/<censored>_backend/aws-sdk-js-session-1672978844372
Jan 6, 2023 @ 12:20:45.476
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: Authentication response
Jan 6, 2023 @ 12:20:45.476
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: SASL Simon authentication successful
Jan 6, 2023 @ 12:20:45.267
NS: SaslAuthenticator-AWS_MSK_IAM, label: ERROR, message: Cannot change principals during re-authentication from IAM.arn:aws:sts::<censored>:assumed-role/<censored>_backend/aws-sdk-js-session-1672975250038: IAM.arn:aws:sts::<censored>:assumed-role/<censored>_backend/aws-sdk-js-session-1672978844372
Jan 6, 2023 @ 12:16:08.451
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: Authentication response
Jan 6, 2023 @ 12:16:08.451
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: SASL Simon authentication successful
Jan 6, 2023 @ 12:06:04.641
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: Authentication response
Jan 6, 2023 @ 12:06:04.641
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: SASL Simon authentication successful
Jan 6, 2023 @ 11:56:02.444
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: Authentication response
Jan 6, 2023 @ 11:56:02.444
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: SASL Simon authentication successful
Jan 6, 2023 @ 11:45:59.298
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: Authentication response
Jan 6, 2023 @ 11:45:59.298
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: SASL Simon authentication successful
Jan 6, 2023 @ 11:40:58.333
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: Authentication response
Jan 6, 2023 @ 11:40:58.333
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: SASL Simon authentication successful
Jan 6, 2023 @ 11:30:55.846
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: Authentication response
Jan 6, 2023 @ 11:30:55.846
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: SASL Simon authentication successful
Jan 6, 2023 @ 11:20:54.488
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: Authentication response
Jan 6, 2023 @ 11:20:54.488
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: SASL Simon authentication successful
Jan 6, 2023 @ 11:20:54.199
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: Authentication response
Jan 6, 2023 @ 11:20:54.199
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: SASL Simon authentication successful
Jan 6, 2023 @ 11:20:54.190
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: Authentication response
Jan 6, 2023 @ 11:20:54.190
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: SASL Simon authentication successful
Jan 6, 2023 @ 11:20:53.964
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: Authentication response
Jan 6, 2023 @ 11:20:53.964
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: SASL Simon authentication successful
Jan 6, 2023 @ 11:20:53.411
NS: ConsumerGroup, label: INFO, message: Consumer has joined the group
Jan 6, 2023 @ 11:20:53.411
NS: undefined, label: WARN, message: KafkaJS v2.0.0 switched default partitioner. To retain the same partitioning behavior as in previous versions, create the producer with the option "createPartitioner: Partitioners.LegacyPartitioner". See the migration guide at https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner for details. Silence this warning by setting the environment variable "KAFKAJS_NO_PARTITIONER_WARNING=1"
Jan 6, 2023 @ 11:20:53.411
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: Authentication response
Jan 6, 2023 @ 11:20:53.411
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: SASL Simon authentication successful
Jan 6, 2023 @ 11:20:53.411
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: Authentication response
Jan 6, 2023 @ 11:20:53.411
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: SASL Simon authentication successful
Jan 6, 2023 @ 11:20:53.411
NS: Consumer, label: INFO, message: Starting
Jan 6, 2023 @ 11:20:53.411
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: Authentication response
Jan 6, 2023 @ 11:20:53.411
NS: SaslAuthenticator-AWS_MSK_IAM, label: INFO, message: SASL Simon authentication successful

So in short we see success messages pop up roughly every 10m, but at the one hour mark something fails irrecoverably. I currently assume this is some AWS session time limit that is set at 1h?

@florian-besser
Copy link
Author

florian-besser commented Jan 10, 2023

I dug a tad deeper; I see the following chain of events:
awsIamAuthenticator returns an authenticate function which on every call constructs a AuthenticationPayloadCreator.

This AuthenticationPayloadCreator calls the AWS SDK methods defaultProvider / getDefaultRoleAssumerWithWebIdentity to assume some default role.
It then sends some request to Kafka.

Kafka on the other hand has code in SaslServerAuthenticator.ReauthInfo.ensurePrincipalUnchanged() which is:

if (!previousKafkaPrincipal.equals(reauthenticatedKafkaPrincipal)) {
                throw new SaslAuthenticationException(String.format(
                        "Cannot change principals during re-authentication from %s.%s: %s.%s",
                        previousKafkaPrincipal.getPrincipalType(), previousKafkaPrincipal.getName(),
                        reauthenticatedKafkaPrincipal.getPrincipalType(), reauthenticatedKafkaPrincipal.getName()));
            }

In other words: If the principal were to change then reauth would be rejected from Kafka side.

So I started digging a bit how to get the AWS SDK to not give me different principal names when requesting sessions.
In the code from AWS-SDK I found a method called fromWebToken which seems to generate sessions with the name RoleSessionName: roleSessionName ?? aws-sdk-js-session-${Date.now()}.
This is the only time I found the string aws-sdk-js-session- in the AWS-SDK codebase, and that exact string is present in the error message above.

fromWebToken is called from resolveTokenFile, which sets roleSessionName to init?.roleSessionName ?? process.env["AWS_ROLE_SESSION_NAME"];

I will try setting AWS_ROLE_SESSION_NAME env variable to a static string and report back how that turns out.

@florian-besser
Copy link
Author

Setting AWS_ROLE_SESSION_NAME has solved the issue for us; we ran successfully throughout the night without disconnects. I'm closing this for now but maybe @jmaver-plume if you're agreeable I could raise a PR against the documentation to mention this edge case?

I feel there might be other users like me running around that do not have this variable set.

@tylermichael
Copy link

tylermichael commented Mar 30, 2023

I'm also running into this. I get this Cannot change principals during re-authentication error every hour.

@jmaver-plume I would greatly appreciate any guidance you might have here.

How I understand @florian-besser's analysis, this is happening because of logic inside of the SASL server. So setting this environment variable makes it so that the session name will always be the same, side stepping this particular exception.

It almost feels like this library should accept this as a parameter (possibly even generate a static unique default value?) if possible, or at the very least document you should set it as an environment variable like @florian-besser mentioned.

@mikehall314
Copy link

I have this same issue, though AWS_ROLE_SESSION_NAME doesn't seem to have solved it for us.

@jfr992
Copy link

jfr992 commented May 3, 2023

AWS_ROLE_SESSION_NAME worked for me aswell, it should be added to the mechanism

@anilspydra
Copy link

anilspydra commented May 4, 2023

@florian-besser
after adding the AWS_ROLE_SESSION_NAME , the authentication is succesful but facing issue with broker pool and getting timeed out

{"level":"ERROR","timestamp":"2023-05-04T14:25:02.229Z","logger":"kafkajs","message":"[SaslAuthenticator-AWS_MSK_IAM] Request SaslAuthenticate(key: 36, version: 1) timed out","broker":"b-1.ap-south-1.amazonaws.com"}
{"level":"ERROR","timestamp":"2023-05-04T14:25:02.230Z","logger":"kafkajs","message":"[BrokerPool] Request SaslAuthenticate(key: 36, version: 1) timed out","retryCount":0,"retryTime":535}
{"level":"WARN","timestamp":"2023-05-04T14:25:03.019Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"blockchain-microservice","broker":"b-1.ap-south-1.amazonaws.com","correlationId":2}
{"level":"ERROR","timestamp":"2023-05-04T14:25:08.052Z","logger":"kafkajs","message":"[SaslAuthenticator-AWS_MSK_IAM] Request SaslAuthenticate(key: 36, version: 1) timed out","broker":"b-1.ap-south-1.amazonaws.com"}
{"level":"ERROR","timestamp":"2023-05-04T14:25:08.053Z","logger":"kafkajs","message":"[BrokerPool] Request SaslAuthenticate(key: 36, version: 1) timed out","retryCount":1,"retryTime":1214}
{"level":"WARN","timestamp":"2023-05-04T14:25:09.896Z","logger":"kafkajs","message":"[RequestQueue] Response without match","clientId":"blockchain-microservice","broker":"b-1.ap-south-1.amazonaws.com","correlationId":4}
{"level":"INFO","timestamp":"2023-05-04T14:25:14.134Z","logger":"kafkajs","message":"[SaslAuthenticator-AWS_MSK_IAM] Authentication response","authenticateResponse":{"version":"2020_10_22","request-id":"4a26ca26-38fa-483a-bf2f-bd9e07f5a3b2"}}
{"level":"INFO","timestamp":"2023-05-04T14:25:14.134Z","logger":"kafkajs","message":"[SaslAuthenticator-AWS_MSK_IAM] SASL AWS_MSK_IAM authentication successful","broker":"b-1.ap-south-1.amazonaws.com"}
2023-05-04 19:55:19:5519 - [debug] - : error-in-consumer: Request Metadata(key: 3, version: 6) timed out

@jfr992
Copy link

jfr992 commented May 4, 2023

@anilspydra did you check the security group?

@anilspydra
Copy link

anilspydra commented May 4, 2023

@anilspydra did you check the security group?
@jfr992
Didn't get you ?, Should I check if I added any security group in the AWS MSK cluster ?

@jfr992
Copy link

jfr992 commented May 4, 2023

@anilspydra I mean, a timeout could be the security group of the MSK cluster, make sure to add port 9098 to it, and set your brokers env var or value to broker-1:9098, since 9096 is for SCRAM.

@anilspydra
Copy link

anilspydra commented May 4, 2023

@anilspydra I mean, a timeout could be the security group of the MSK cluster, make sure to add port 9098 to it, and set your brokers env var or value to broker-1:9098, since 9096 is for SCRAM.

@jfr992
the broker value is currently broker-1:9198, would that suffice?

@jfr992
Copy link

jfr992 commented May 4, 2023

@anilspydra (https://docs.aws.amazon.com/msk/latest/developerguide/port-info.html), in that case I dont know, something is not letting the connection go trough.

@anilspydra
Copy link

anilspydra commented May 4, 2023

@anilspydra should be 9098 for AWS

yeah that is for private endpoint, if the cluster is made to public the port is showing 9198

@anilspydra
Copy link

anilspydra commented May 4, 2023

@jfr992
Do I need to change anything of the timeouts while initialization the kafka client in nodejs ? Any Idea about that ? the ideal configuration that is needed.

@jmaver-plume
Copy link
Owner

Setting AWS_ROLE_SESSION_NAME has solved the issue for us; we ran successfully throughout the night without disconnects. I'm closing this for now but maybe @jmaver-plume if you're agreeable I could raise a PR against the documentation to mention this edge case?

I feel there might be other users like me running around that do not have this variable set.

Yes, please create a PR.

@jmaver-plume jmaver-plume reopened this May 7, 2023
@cion-yatindra
Copy link

@florian-besser @jmaver-plume can you please tell where and how to set AWS_ROLE_SESSION_NAME variable? And what should be its value?

@jfr992
Copy link

jfr992 commented Jun 1, 2023

@cion-yatindra it should be an env var, you can set it if you are running the code in a container. In a local env it can be set with : export AWS_ROLE_SESSION_NAME='session-name'

@stale
Copy link

stale bot commented Jul 31, 2023

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@akospaska
Copy link

AWS_ROLE_SESSION_NAME

Hey,

Is there any detailed explanation why the AWS_ROLE_SESSION_NAME is solves the issue?

I had the same, and now it works fine, but I need an exact answer why it is the only way.

Thank you for your help in advance!

@florian-besser
Copy link
Author

@akospaska I believe I provided more details in my first comment: #20 (comment)

If there is anything unclear in there I can gladly help point it out; the comment dives into the internals of the Kafka codebase as well as AWS quirks.

@akospaska
Copy link

akospaska commented Dec 7, 2023

Hell @florian-besser ,

First of all Iam super glad you have commented :-)

We want to use this solution in production environment, I just want to gather every information why the client has to set the AWS_ROLE_SESSION_NAME to a static value.

As far as i saw the fromNodeProviderChain calls in our case the fromWebToken.

Which is not clear why we receives the principal error message.

If we don't set an AWS_ROLE_SESSION_NAME the fromWebToken will set that env var?

I am just looking a clear explanation what I can provide for the other teams why we have to set the AWS_ROLE_SESSION_NAME environment variable.

Thank you for your help in advance!

@florian-besser
Copy link
Author

florian-besser commented Dec 8, 2023

@akospaska I believe my comment answered that:

I found a method called fromWebToken which seems to generate sessions with the name RoleSessionName: roleSessionName ?? aws-sdk-js-session-${Date.now()}.

In short fromWebToken generates by default a non-static RoleSessionName, which gets updated ever so often. The RoleSessionName is used as the principal for Kafka. Kafka is unable to deal with a changing principal names.

To get static RoleSessionName you can set AWS_ROLE_SESSION_NAME or read the code of the AWS SDK for alternative ways.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
wontfix This will not be worked on
Projects
None yet
Development

No branches or pull requests

8 participants