-
Notifications
You must be signed in to change notification settings - Fork 440
Read from kafka very slow! #306
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
|
@magnum-ferox Do you use low level or high level consumer? |
Default consumer:
Consumer async commit true:
To enabled async commit do: <?php
$context = (new RdKafkaConnectionFactory($config))->createContext();
$topic = $context->createTopic('a_topic');
$consumer = $context->createConsumer($topic);
$consumer->setCommitAsync(true); |
Script to test: <?php
use Enqueue\RdKafka\RdKafkaConnectionFactory;
include_once __DIR__.'/vendor/autoload.php';
$config = [
'global' => [
'group.id' => uniqid('', true),
'metadata.broker.list' => getenv('RDKAFKA_HOST').':'.getenv('RDKAFKA_PORT'),
'enable.auto.commit' => 'false',
],
'topic' => [
'auto.offset.reset' => 'beginning',
],
];
$context = (new RdKafkaConnectionFactory($config))->createContext();
sleep(3);
$topic = $context->createTopic(uniqid('', true));
$consumer = $context->createConsumer($topic);
$consumer->setCommitAsync(true);
$expectedBody = __CLASS__.time();
$time = microtime(true);
foreach (range(1, 1010) as $index) {
$context->createProducer()->send($topic, $context->createMessage($index));
}
var_dump(microtime(true) - $time);
$index = 0;
$time = microtime(true);
while ($index < 1000) {
if ($message = $consumer->receive(2000)) {
$consumer->acknowledge($message);
$index++;
}
}
var_dump(microtime(true) - $time);
$context->close(); |
No. I try your script: and without $consumer->acknowledge($message); librdkafka-0.11.1-1.el7.x86_64 queue-interop/queue-interop (0.6.1) ping to kafka server: |
More than 1 second for each message!
arnaud-lb/php-rdkafka read over 1000 messager per second!
Ping to server ~ 50ms
The text was updated successfully, but these errors were encountered: