Kafka persistent stream provider for Microsoft Orleans that uses the Confluent SDK. This provider has the added benefit that it allows external messages (not generated from orleans) to be merged with orleans streaming system to be consumed as if the messages were generated by orleans.
Orleans.Streams.Kafka
has the following dependencies:
To start working with the Orleans.Streams.Kafka
make sure you do the following steps:
- Install Kafka on a machine (or cluster) which you have access to use the Confluent Platform.
- Create Topics in Kafka with as many partitions as needed for each topic.
- Install the
Orleans.Streams.Kafka
nuget from the nuget repository. - Add to the Silo configuration the new stream provider with the necessary parameters and the optional ones (if you wish). you can see what is configurable in KafkaStreamProvider under Configurable Values.
Example KafkaStreamProvider configuration:
public class SiloBuilderConfigurator : ISiloBuilderConfigurator
{
public void Configure(ISiloBuilder hostBuilder)
=> hostBuilder
.AddMemoryGrainStorage("PubSubStore")
.AddKafka("KafkaStreamProvider")
.WithOptions(options =>
{
options.BrokerList = new [] {"localhost:8080"};
options.ConsumerGroupId = "E2EGroup";
options.ConsumeMode = ConsumeMode.StreamEnd;
options
.AddTopic(Consts.StreamNamespace)
.AddTopic(Consts.StreamNamespace2, new TopicCreationConfig { AutoCreate = true, Partitions = 2, ReplicationFactor = 1 })
.AddExternalTopic<TestModel>(Consts.StreamNamespaceExternal)
;
})
.AddJson()
.AddLoggingTracker()
.Build()
;
}
public class ClientBuilderConfigurator : IClientBuilderConfigurator
{
public virtual void Configure(IConfiguration configuration, IClientBuilder clientBuilder)
=> clientBuilder
.AddKafka("KafkaStreamProvider")
.WithOptions(options =>
{
options.BrokerList = new [] {"localhost:8080"};
options.ConsumerGroupId = "E2EGroup";
options
.AddTopic(Consts.StreamNamespace)
.AddTopic(Consts.StreamNamespace2, new TopicCreationConfig { AutoCreate = true, Partitions = 2, ReplicationFactor = 1 })
.AddExternalTopic<TestModel>(Consts.StreamNamespaceExternal)
;
})
.AddJson()
.Build()
;
}
var testGrain = clusterClient.GetGrain<ITestGrain>(grainId);
var result = await testGrain.GetThePhrase();
Console.BackgroundColor = ConsoleColor.DarkMagenta;
Console.WriteLine(result);
var streamProvider = clusterClient.GetStreamProvider("KafkaProvider");
var stream = streamProvider.GetStream<TestModel>("streamId", "topic1");
await stream.OnNextAsync(new TestModel
{
Greeting = "hello world"
});
var kafkaProvider = GetStreamProvider("KafkaStreamProvider");
var testStream = kafkaProvider.GetStream<TestModel>("streamId", "topic1");
// To resume stream in case of stream deactivation
var subscriptionHandles = await testStream.GetAllSubscriptionHandles();
if (subscriptionHandles.Count > 0)
{
foreach (var subscriptionHandle in subscriptionHandles)
{
await subscriptionHandle.ResumeAsync(OnNextTestMessage);
}
}
await testStream.SubscribeAsync(OnNextTestMessage);
Note: The Stream Namespace identifies the Kafka topic.
These are the configurable values that the Orleans.Streams.Kafka
:
- Topics: The topics that will be used where messages will be Produced/Consumed.
- BrokerList: List of Kafka brokers to connect to.
- ConsumerGroupId: The ConsumerGroupId used by the Kafka Consumer. Default value is
orleans-kafka
- PollTimeout: Determines the duration that the Kafka consumer blocks for to wait for messages. Default value is
100ms
- PollBufferTimeout: Determines the duration the
KafkaAdapterReceiver
will continue to poll for messages (for the same batch) Default value is500ms
- AdminRequestTimeout: Timeout for admin requests. Default value is
5s
- ConsumeMode: Determines the offset to start consuming from. Default value is
ConsumeMode.LastCommittedMessage
- ProducerTimeout: Timeout for produce requests. Default value is
5s
- RetentionPeriodInMs: Message retention period in milliseconds for kafka topic. Default value is 7 days