Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rd_kafka_abort_transaction: Failed to end transaction: Broker: Producer attempted a transactional operation in an invalid state #4849

Open
6 tasks done
arnaud-lb opened this issue Sep 18, 2024 · 0 comments

Comments

@arnaud-lb
Copy link
Contributor

Description

rd_kafka_abort_transaction() can sometimes fail with INVALID_TXN_STATE after a successful rd_kafka_transactions_init() and rd_kafka_begin_transaction().

How to reproduce

Reproducer:

#include <stdlib.h>
#include <string.h>
#include <sys/syslog.h>
#include <librdkafka/rdkafka.h>

void setconf(rd_kafka_conf_t *conf, const char *name, const char *value) {
    char errstr[256];
    if (rd_kafka_conf_set(conf, name, value, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        fprintf(stderr, "rd_kafka_conf_set: %s\n", errstr);
        exit(1);
    }
}

void produce(rd_kafka_topic_t *rkt) {
    for (int i = 0; i < 5; i++) {
        char payload[128];
        snprintf(payload, sizeof(payload), "Message %d", i);
        int ret = rd_kafka_produce(rkt,
                RD_KAFKA_PARTITION_UA,
                RD_KAFKA_MSG_F_COPY,
                payload, strlen(payload),
                NULL, 0,
                NULL);
        if (ret != 0) {
            fprintf(stderr, "rd_kafka_produce: %d\n", ret);
            exit(1);
        }
    }
}

void kafka_conf_dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque)
{
}

int main(void) {
    char errstr[256];
    rd_kafka_error_t *err;

    rd_kafka_conf_t *conf = rd_kafka_conf_new();

    setconf(conf, "log_level", "7" /* LOG_DEBUG */);
    setconf(conf, "debug", "all");
    setconf(conf, "metadata.broker.list", "localhost:9092");
    setconf(conf, "transactional.id", "dummy");
    
    rd_kafka_conf_set_opaque(conf, (void*)123);
    rd_kafka_conf_set_dr_msg_cb(conf, kafka_conf_dr_msg_cb);

    rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    if (!rk) {
        fprintf(stderr, "rd_kafka_new: %s\n", errstr);
        exit(1);
    }

    rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, "myTopic", NULL);
    if (!rkt) {
        fprintf(stderr, "rd_kafka_topic_new: unkonwn error\n");
        exit(1);
    }

    err = rd_kafka_init_transactions(rk, 3000);
    if (err != NULL) {
        fprintf(stderr, "rd_kafka_init_transactions: %s\n", rd_kafka_error_string(err));
        exit(1);
    }
    
    // fprintf(stderr, "Transactions initialized\n");

    for (;;) {
        err = rd_kafka_begin_transaction(rk);
        if (err != NULL) {
            fprintf(stderr, "rd_kafka_begin_transaction: %s\n", rd_kafka_error_string(err));
            exit(1);
        }

        produce(rkt);
        // rd_kafka_poll(rk, 0);

        err = rd_kafka_commit_transaction(rk, -1);
        if (err != NULL) {
            fprintf(stderr, "rd_kafka_commit_transaction: %s\n", rd_kafka_error_string(err));
            exit(1);
        }

        // rd_kafka_poll(rk, 0);

        err = rd_kafka_begin_transaction(rk);
        if (err != NULL) {
            fprintf(stderr, "rd_kafka_begin_transaction: %s\n", rd_kafka_error_string(err));
            exit(1);
        }

        produce(rkt);
        // rd_kafka_poll(rk, 0);

        err = rd_kafka_abort_transaction(rk, -1);
        if (err != NULL) {
            fprintf(stderr, "rd_kafka_abort_transaction: %s\n", rd_kafka_error_string(err));
            exit(1);
        }
        
        rd_kafka_poll(rk, 0);
    }

    return 0;
}
$ docker run -it --rm -p 9092:9092 -e KAFKA_LOG4J_LOGGERS="kafka=DEBUG"  apache/kafka:3.8.0
$ cc -o bug bug.c -lrdkafka
$ ./bug >log 2>&1

On my machine the program terminates after less than 10 seconds. If it doesn't, restarting it and rebuilding librdkafka in the background to generate noise helps.

Checklist

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant