Skip to content

Commit

Permalink
Benchmark program update (#59)
Browse files Browse the repository at this point in the history
* Updates on KNetConsumer

* #53 (comment)

* Fixed some numbers

* #53: stdev becomes SD

* #53: updates on report

* #53: updates on report

* #53: extra tests
  • Loading branch information
masesdevelopers authored May 4, 2022
1 parent e27b637 commit 40e402c
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 37 deletions.
36 changes: 29 additions & 7 deletions src/net/Documentation/articles/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,21 @@ The configuration is:

Here below a set of results, in bold the results which are better using KNet:

- KNet/Confluent.Kafka Produce Average ratio percentage (stdev ratio percentage):
- KNet/Confluent.Kafka Produce Average ratio percentage (SD ratio percentage):

| | 10 bytes | 100 bytes | 1000 bytes | 10000 bytes | 100000 bytes |
|--- |--- |--- |--- |--- |--- |
| 10 messages | **9,04 (4,34)** | **5,47 (3,1)** | **15,45 (5,29)** | **7,54 (4,38)** | **19,73 (4,23)** |
| 100 messages | **18,9 (18,9)** | **38,1 (8,1)** | **30,34 (5,44)** | **26 (3,04)** | **69,4 (13,09)** |
| 100 messages | **18,9 (6,29)** | **38,1 (8,1)** | **30,34 (5,44)** | **26 (3,04)** | **69,4 (13,09)** |
| 1000 messages | 197,73 (10,54) | 109,92 (6,13) | **57,6 (7,32)** | **52,71 (8,17)** | **75,76 (43,7)** |
| 10000 messages | 2102,28 (736,54) | 796,84 (514,28) | 173,39 (401,76) | 123,62 (620,46) | **99,5 (108,3)** |

- KNet/Confluent.Kafka Consume Average ratio percentage (stdev ratio percentage):
- KNet/Confluent.Kafka Consume Average ratio percentage (SD ratio percentage):

| | 10 bytes | 100 bytes | 1000 bytes | 10000 bytes | 100000 bytes |
|--- |--- |--- |--- |--- |--- |
| 10 messages | **85,93 (399,84)**| **85,41 (282,85)** | **85,14 (297,98)** | **24,07 (229,23)** | **36,23 (285,77)** |
| 100 messages | **94,54 (94,54)** | **94,7 (287,78)** | **68,49 (389,25)** | **71,97 (276,56)** | 108,57 (89,45) |
| 100 messages | **94,54 (479,13)** | **94,7 (287,78)** | **68,49 (389,25)** | **71,97 (276,56)** | 108,57 (89,45) |
| 1000 messages | 192,27 (906,94) | 521,86 (867,93) | 103,62 (1854,84) | 255,52 (287,33) | 163,24 (124,23) |
| 10000 messages | 9153,56 (77543,04) | 7948,76 (69701,75) | 3848,12 (23910,64) | 706,83 (3905,89) | 213,46 (1013,16) |

Expand All @@ -121,9 +121,9 @@ Looking at the above table KNet performs better than Confluent.Kafka with burst
The best produce performance was obtained with 10 messages of 100, or 10000, bytes: KNet is 20 times fast than Confluent.Kafka.
The best consume performance was obtained with 10 messages of 10000 bytes: KNet is 4 times fast than Confluent.Kafka.

### Stdev ratio percentage
### SD ratio percentage

Looking at value within the brackets that represents the ratio of the stdev it is possible to highlight that:
Looking at value within the brackets, that represents the ratio of the SD, it is possible to highlight that:
- in produce KNet has more stable measures except when the number of messages is high (10000 messages);
- in consume KNet has less stable measures.

Expand All @@ -135,4 +135,26 @@ Using KNetProducer the numbers of JNI invocation are less than using KafkaProduc
The same consideration can be applied on the consume side: KNetConsumer does not reduce the impact of JNI interface and it does not give any great improvement.
The JNI interface has an impact even when the number of messages is high because during processing the Garbage Collector is activated many times increasing the JNI overhead.

With the upcoming JCOBridge major version the JNI impact will be reduced and KNet will get extra performance both in produce and in consume.
**With the upcoming JCOBridge major version the JNI impact will be reduced and KNet will get extra performance both in produce and in consume.**

The first measures done with an internal alfa release reports big improvements. Here some cases:
- 100 messages:
- 10 bytes: Produce Average ratio percentage from 18 to 11, Consume Average ratio percentage from 94 to 88
- 100 bytes: Produce Average ratio percentage from 38 to 17, Consume Average ratio percentage from 94 to 88
- 1000 bytes: Produce Average ratio percentage from 30 to 16, Consume Average ratio percentage from 68 to 38
- 10000 bytes: Produce Average ratio percentage from 26 to 24, Consume Average ratio percentage from 71 to 47
- 100000 bytes: Produce Average ratio percentage from 69 to 43, Consume Average ratio percentage from 108 to 101
- 1000 messages:
- 10 bytes: Produce Average ratio percentage from 197 to 111, Consume Average ratio percentage from 192 to 137
- 100 bytes: Produce Average ratio percentage from 109 to 86, Consume Average ratio percentage from 521 to 276
- 1000 bytes: Produce Average ratio percentage from 57 to 53, Consume Average ratio percentage from 103 to 56
- 10000 bytes: Produce Average ratio percentage from 52 to 48, Consume Average ratio percentage from 255 to 177
- 100000 bytes: Produce Average ratio percentage from 75 to 71, Consume Average ratio percentage from 163 to 146
- 10000 messages:
- 10 bytes: Produce Average ratio percentage from 2102 to 656, Consume Average ratio percentage from 9153 to 4139
- 100 bytes: Produce Average ratio percentage from 796 to 410, Consume Average ratio percentage from 7948 to 3378
- 1000 bytes: Produce Average ratio percentage from 173 to 81, Consume Average ratio percentage from 3848 to 1668
- 10000 bytes: Produce Average ratio percentage from 123 to 80, Consume Average ratio percentage from 706 to 390
- 100000 bytes: Produce Average ratio percentage from 100 to 96, Consume Average ratio percentage from 213 to 172


133 changes: 123 additions & 10 deletions src/net/KNet/ClientSide/BridgedClasses/Clients/Consumer/KNetConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,30 @@
using Java.Util.Regex;
using MASES.JCOBridge.C2JBridge;
using System;
using System.Collections.Concurrent;

namespace MASES.KNet.Clients.Consumer
{
public class Message<K, V>
{
readonly ConsumerRecord<K, V> record;
readonly KNetConsumerCallback<K, V> obj;
internal Message(KNetConsumerCallback<K, V> obj)
{
this.obj = obj;
}
internal Message(ConsumerRecord<K, V> record)
{
this.record = record;
}

//public string Topic => data.TypedEventData;
public string Topic => record != null ? record.Topic : obj.Instance.Invoke<string>("getTopic");

//public int Partition => data.GetAt<int>(0);
public int Partition => record != null ? record.Partition : obj.Instance.Invoke<int>("getPartition");

public K Key => obj.Instance.Invoke<K>("getKey");
public K Key => record != null ? record.Key : obj.Instance.Invoke<K>("getKey");

public V Value => obj.Instance.Invoke<V>("getValue");
public V Value => record != null ? record.Value : obj.Instance.Invoke<V>("getValue");
}

public interface IKNetConsumerCallback<K, V> : IJVMBridgeBase
Expand Down Expand Up @@ -73,11 +79,26 @@ public virtual void RecordReady(Message<K, V> message) { }

public interface IKNetConsumer<K, V> : IConsumer<K, V>
{
bool IsCompleting { get; }

bool IsEmpty { get; }

int WaitingMessages { get; }

void SetCallback(Action<Message<K, V>> cb);

void ConsumeAsync(long timeoutMs);

void Consume(long timeoutMs, Action<Message<K, V>> callback);
}

public class KNetConsumer<K, V> : KafkaConsumer<K, V>, IKNetConsumer<K, V>
{
bool threadRunning = false;
long dequeing = 0;
readonly System.Threading.Thread consumeThread = null;
readonly System.Threading.ManualResetEvent threadExited = null;
readonly ConcurrentQueue<ConsumerRecords<K, V>> consumedRecords = null;
readonly KNetConsumerCallback<K, V> consumerCallback = null;

public override string ClassName => "org.mases.knet.clients.consumer.KNetConsumer";
Expand All @@ -86,18 +107,40 @@ public KNetConsumer()
{
}

public KNetConsumer(Properties props)
public KNetConsumer(Properties props, bool useJVMCallback = false)
: base(props)
{
consumerCallback = new KNetConsumerCallback<K, V>(CallbackMessage);
IExecute("setCallback", consumerCallback);
if (useJVMCallback)
{
consumerCallback = new KNetConsumerCallback<K, V>(CallbackMessage);
IExecute("setCallback", consumerCallback);
}
else
{
consumedRecords = new();
threadRunning = true;
threadExited = new(false);
consumeThread = new(ConsumeHandler);
consumeThread.Start();
}
}

public KNetConsumer(Properties props, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer)
public KNetConsumer(Properties props, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, bool useJVMCallback = false)
: base(props, keyDeserializer, valueDeserializer)
{
consumerCallback = new KNetConsumerCallback<K, V>(CallbackMessage);
IExecute("setCallback", consumerCallback);
if (useJVMCallback)
{
consumerCallback = new KNetConsumerCallback<K, V>(CallbackMessage);
IExecute("setCallback", consumerCallback);
}
else
{
consumedRecords = new();
threadRunning = true;
threadExited = new(false);
consumeThread = new(ConsumeHandler);
consumeThread.Start();
}
}

Action<Message<K, V>> actionCallback = null;
Expand All @@ -112,11 +155,81 @@ public override void Dispose()
base.Dispose();
IExecute("setCallback", null);
consumerCallback?.Dispose();
threadRunning = false;
if (consumedRecords != null)
{
lock (consumedRecords)
{
System.Threading.Monitor.Pulse(consumedRecords);
}
while (IsCompleting) { threadExited.WaitOne(100); };
actionCallback = null;
}
}

public void SetCallback(Action<Message<K, V>> cb)
{
actionCallback = cb;
}

void ConsumeHandler(object o)
{
try
{
while (threadRunning)
{
if (consumedRecords.TryDequeue(out ConsumerRecords<K, V> records))
{
System.Threading.Interlocked.Increment(ref dequeing);
try
{
foreach (var item in records)
{
actionCallback?.Invoke(new Message<K, V>(item));
}
}
catch { }
finally
{
System.Threading.Interlocked.Decrement(ref dequeing);
}
}
else
{
lock (consumedRecords)
{
System.Threading.Monitor.Wait(consumedRecords);
}
}
}
}
catch { }
finally { threadExited.Set(); threadRunning = false; }
}

public bool IsCompleting => !consumedRecords.IsEmpty || System.Threading.Interlocked.Read(ref dequeing) != 0;

public bool IsEmpty => consumedRecords.IsEmpty;

public int WaitingMessages => consumedRecords.Count;

public void ConsumeAsync(long timeoutMs)
{
Duration duration = TimeSpan.FromMilliseconds(timeoutMs);
if (consumedRecords == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to true.");
if (!threadRunning) throw new InvalidOperationException("Dispatching thread is not running.");
var results = this.Poll(duration);
consumedRecords.Enqueue(results);
lock (consumedRecords)
{
System.Threading.Monitor.Pulse(consumedRecords);
}
}

public void Consume(long timeoutMs, Action<Message<K, V>> callback)
{
Duration duration = TimeSpan.FromMilliseconds(timeoutMs);
if (consumerCallback == null) throw new ArgumentException("Cannot be used since constructor was called with useJVMCallback set to false.");
try
{
actionCallback = callback;
Expand Down
Loading

0 comments on commit 40e402c

Please sign in to comment.