Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 13 additions & 16 deletions src/workers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ KafkaConsumerConsumeNum::~KafkaConsumerConsumeNum() {}
uint32_t getTick() {
struct timespec ts;
unsigned theTick = 0U;
clock_gettime( CLOCK_MONOTONIC, &ts );
clock_gettime(CLOCK_MONOTONIC, &ts);
theTick = ts.tv_nsec / 1000000;
theTick += ts.tv_sec * 1000;
return theTick;
Expand All @@ -541,34 +541,31 @@ void KafkaConsumerConsumeNum::Execute() {
#ifndef _WIN32
struct timespec ts;
unsigned start = 0U;
clock_gettime( CLOCK_MONOTONIC, &ts );
clock_gettime(CLOCK_MONOTONIC, &ts);
start = ts.tv_nsec / 1000000;
start += ts.tv_sec * 1000;
#else
long int start = GetTickCount();
int start = GetTickCount();
#endif

while (m_messages.size() < max && looping) {
// Get a message
// TODO: Create a new property like setDefaultConsumeTimeout(), called setDefaultConsumeTotalTimeout()
// For now, we use the default consume timeout as total, and the actual consume timeout is total / 10
// Eg for a value of 1000ms, we consume every 100ms.
Baton b = m_consumer->Consume(timeout_ms);
#ifndef _WIN32
unsigned end = 0U;
clock_gettime( CLOCK_MONOTONIC, &ts );
clock_gettime(CLOCK_MONOTONIC, &ts);
end = ts.tv_nsec / 1000000;
end += ts.tv_sec * 1000;
unsigned elapsed = (end - start);
if (elapsed >= toal_timeout_ms) {
looping = false;
}
unsigned elapsed = (end - start);
if (elapsed >= toal_timeout_ms) {
looping = false;
}
#else
long int end = GetTickCount();
long elapsed = (end - start);
if (elapsed >= toal_timeout_ms) {
looping = false;
}
int end = GetTickCount();
int elapsed = (end - start);
if (elapsed >= toal_timeout_ms) {
looping = false;
}
#endif
switch (b.err()) {
case RdKafka::ERR__PARTITION_EOF:
Expand Down