diff --git a/src/workers.cc b/src/workers.cc index eb711a6f..9fa7ef20 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -525,7 +525,7 @@ KafkaConsumerConsumeNum::~KafkaConsumerConsumeNum() {} uint32_t getTick() { struct timespec ts; unsigned theTick = 0U; - clock_gettime( CLOCK_MONOTONIC, &ts ); + clock_gettime(CLOCK_MONOTONIC, &ts); theTick = ts.tv_nsec / 1000000; theTick += ts.tv_sec * 1000; return theTick; @@ -541,34 +541,31 @@ void KafkaConsumerConsumeNum::Execute() { #ifndef _WIN32 struct timespec ts; unsigned start = 0U; - clock_gettime( CLOCK_MONOTONIC, &ts ); + clock_gettime(CLOCK_MONOTONIC, &ts); start = ts.tv_nsec / 1000000; start += ts.tv_sec * 1000; #else - long int start = GetTickCount(); + int start = GetTickCount(); #endif while (m_messages.size() < max && looping) { // Get a message - // TODO: Create a new property like setDefaultConsumeTimeout(), called setDefaultConsumeTotalTimeout() - // For now, we use the default consume timeout as total, and the actual consume timeout is total / 10 - // Eg for a value of 1000ms, we consume every 100ms. Baton b = m_consumer->Consume(timeout_ms); #ifndef _WIN32 unsigned end = 0U; - clock_gettime( CLOCK_MONOTONIC, &ts ); + clock_gettime(CLOCK_MONOTONIC, &ts); end = ts.tv_nsec / 1000000; end += ts.tv_sec * 1000; - unsigned elapsed = (end - start); - if (elapsed >= toal_timeout_ms) { - looping = false; - } + unsigned elapsed = (end - start); + if (elapsed >= toal_timeout_ms) { + looping = false; + } #else - long int end = GetTickCount(); - long elapsed = (end - start); - if (elapsed >= toal_timeout_ms) { - looping = false; - } + int end = GetTickCount(); + int elapsed = (end - start); + if (elapsed >= toal_timeout_ms) { + looping = false; + } #endif switch (b.err()) { case RdKafka::ERR__PARTITION_EOF: