Skip to content

Commit

Permalink
[improvement](routine-load) add routine load rows check (apache#25818)
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui authored Oct 25, 2023
1 parent 552091f commit 5e3277e
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 1 deletion.
3 changes: 2 additions & 1 deletion be/src/runtime/routine_load/data_consumer_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx,
bool eos = false;
while (true) {
if (eos || left_time <= 0 || left_rows <= 0 || left_bytes <= 0) {
_rows = ctx->max_batch_rows - left_rows;
LOG(INFO) << "consumer group done: " << _grp_id
<< ". consume time(ms)=" << ctx->max_interval_s * 1000 - left_time
<< ", received rows=" << ctx->max_batch_rows - left_rows
<< ", received rows=" << _rows
<< ", received bytes=" << ctx->max_batch_size - left_bytes << ", eos: " << eos
<< ", left_time: " << left_time << ", left_rows: " << left_rows
<< ", left_bytes: " << left_bytes
Expand Down
6 changes: 6 additions & 0 deletions be/src/runtime/routine_load/data_consumer_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ class DataConsumerGroup {
++_counter;
}

int64_t get_consumer_rows() const { return _rows; }

void set_consumer_rows(int64_t rows) { _rows = rows; }

// start all consumers
virtual Status start_all(std::shared_ptr<StreamLoadContext> ctx,
std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe) {
Expand All @@ -77,6 +81,8 @@ class DataConsumerGroup {
// when the counter becomes zero, shutdown the queue to finish
std::mutex _mutex;
int _counter;
// received total rows
int64_t _rows {0};
};

// for kafka
Expand Down
9 changes: 9 additions & 0 deletions be/src/runtime/routine_load/routine_load_task_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,15 @@ void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
// wait for all consumers finished
HANDLE_ERROR(ctx->future.get(), "consume failed");

// check received and load rows
LOG(INFO) << "routine load task received rows: " << consumer_grp.get()->get_consumer_rows()
<< " load total rows: " << ctx.get()->number_total_rows
<< " loaded rows: " << ctx.get()->number_loaded_rows
<< " filtered rows: " << ctx.get()->number_filtered_rows
<< " unselected rows: " << ctx.get()->number_unselected_rows;
DCHECK(consumer_grp.get()->get_consumer_rows() == ctx.get()->number_total_rows);
consumer_grp.get()->set_consumer_rows(0);

ctx->load_cost_millis = UnixMillis() - ctx->start_millis;

// return the consumer back to pool
Expand Down

0 comments on commit 5e3277e

Please sign in to comment.