Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish message to Kafka topic (KRaft) with Dapr C# #1291

Open
abdul-hafeel opened this issue May 17, 2024 · 1 comment
Open

Publish message to Kafka topic (KRaft) with Dapr C# #1291

abdul-hafeel opened this issue May 17, 2024 · 1 comment
Labels
question Further information is requested

Comments

@abdul-hafeel
Copy link

abdul-hafeel commented May 17, 2024

I'm new to Dapr as well as Kafka. Currently, for my .NET 8 desktop app, I want to implement Dapr with Kafka(KRaft), for that I'm working on POC, there I'm facing issues that:

'Dapr.DaprException: 'Publish operation failed: the Dapr endpoint indicated a failure'. ({"Status(StatusCode="Unavailable", Detail="Error connecting to subchannel.", DebugException="System.Net.Sockets.SocketException: No connection could be made because the target machine actively refused it.")"}).

docker-compose.yaml for Kafka image in Docker

`version: "3.8"

services:
kafka:
image: docker.io/bitnami/kafka:3.7
container_name: kafka
ports:
- "9092:9092"
volumes:
- "kafka_data:/bitnami"
- "./kafka.keystore.jks:/bitnami/kafka/config/certs/kafka.keystore.jks"
- "./kafka.truststore.jks:/bitnami/kafka/config/certs/kafka.truststore.jks"
environment:
# KRaft settings
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# Additional settings
- KAFKA_ADVERTISED_HOST_NAME=127.0.0.1
- KAFKA_CREATE_TOPICS="kafka-topic:1:1"

volumes:
kafka_data:
driver: local`

Program.cs

using Dapr.Client;

class Program
{
    static async Task Main(string[] args)
    {
        var services = new ServiceCollection();
        ConfigureServices(services);
        var serviceProvider = services.BuildServiceProvider();
        var daprClient = serviceProvider.GetRequiredService<DaprClient>();

        var message = new HelloMessage { Text = "Hello, Kafka!" };
        await PublishMessageEvent(daprClient, message);
    }

    //const string topicName = "kafka-topic";

    const string PUBSUB_NAME = "kafka-pubsub";
    const string TOPIC_NAME = "kafka-topic";
    static async Task PublishMessageEvent(DaprClient daprClient, HelloMessage message)
    {
        try
        {
            await daprClient.PublishEventAsync(PUBSUB_NAME, TOPIC_NAME, message);

            Console.WriteLine("Event published successfully.");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error publishing event: {ex.Message}");
        }
    }
    static void ConfigureServices(IServiceCollection services)
    {
        string daprPort = Environment.GetEnvironmentVariable("DAPR_GRPC_PORT")!;
        var grpcEndpoint = $"http://localhost:{daprPort}";
        //var grpcEndpoint = $"http://localhost:52540";

        // var channel = GrpcChannel.ForAddress("http://localhost:5009/");
        // Register DaprClient instance using gRPC channel and DaprClientBuilder
        var daprClient = new DaprClientBuilder()
            .UseGrpcEndpoint(grpcEndpoint) // Specify the gRPC endpoint
            .Build();

        services.AddSingleton(daprClient);
    }
}

public class HelloMessage
{
    public string Text { get; set; } = default!;
}

=================================================

var channel = GrpcChannel.ForAddress("http://localhost:5009/");
new DaprClientBuilder()
            .UseGrpcEndpoint(channel) // Specify the gRPC endpoint
            .Build();

In the above line, it is accepted as a string, not a channel.

Kafka pubsub yaml (Dapr configuration)

`apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: kafka-pubsub
namespace: default
spec:
type: pubsub.kafka
version: v1
metadata:

  • name: brokers
    value: "localhost:9092"
  • name: consumerGroup
    value: "my-group"
  • name: authRequired
    value: "false"
  • name: disableTls
    value: "true"`

Here I have attached tried code and configuration(s). I have turned off the firewall as well. Even though I am facing the same issue. Kindly help me out what I am missing here. Thanks in advance.

@philliphoff
Copy link
Collaborator

@abdul-hafeel My guess would be it's one of two things:

  • The Kafka container isn't getting its ports mapped in the way that Dapr expects. I'd check to see whether you can, independent of Dapr, connect and publish to the running Kafka container. That would help you understand whether it's a Dapr or Docker issue.

  • Dapr isn't getting started with ports configured in the way that the application expects. Can you show the command used to start Dapr and your app, and then show the Dapr logs (and it can help to start Dapr with additional logging, --log-level debug?

@philliphoff philliphoff added the question Further information is requested label Jun 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants