diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index 32240f11b23795..4b3954a27d911a 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -193,7 +193,11 @@ Status KafkaDataConsumer::group_consume( consumer_watch.stop(); switch (msg->err()) { case RdKafka::ERR_NO_ERROR: - if (!queue->blocking_put(msg.get())) { + if (msg->len() == 0) { + // ignore msg with length 0. + // put empty msg into queue will cause the load process shutting down. + break; + } else if (!queue->blocking_put(msg.get())) { // queue is shutdown done = true; } else {