Skip to content

Commit

Permalink
more logs
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen committed Sep 4, 2024
1 parent 360c6be commit 9695f71
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 7 deletions.
3 changes: 3 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, const SegmentStatist
Status BetaRowsetWriterV2::flush_memtable(vectorized::Block* block, int32_t segment_id,
int64_t* flush_size) {
if (block->rows() == 0) {
LOG(INFO) << "skipped empty block when flush_memtable, segment_id=" << segment_id
<< ", load_id=" << print_id(_context.load_id)
<< ", tablet_id=" << _context.tablet_id;
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/beta_rowset_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class BetaRowsetWriterV2 : public RowsetWriter {

int32_t allocate_segment_id() override {
auto id = _segment_creator.allocate_segment_id();
LOG(INFO) << "allocating segment id " << id << ", load_id=" << print_id(_context.load_id)
LOG(INFO) << "allocating segment_id=" << id << ", load_id=" << print_id(_context.load_id)
<< ", tablet_id=" << _context.tablet_id;
return id;
}
Expand Down
13 changes: 11 additions & 2 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data

int64_t src_id = header.src_id();
uint32_t segid = header.segment_id();
LOG(INFO) << "append_data src_id=" << src_id
<< ", tablet_id=" << header.tablet_id() << ", segid=" << segid << *this;
// Ensure there are enough space and mapping are built.
SegIdMapping* mapping = nullptr;
{
Expand All @@ -128,8 +130,8 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
for (size_t index = origin_size; index <= segid; index++) {
mapping->at(index) = _next_segid;
_next_segid++;
VLOG_DEBUG << "src_id=" << src_id << ", segid=" << index << " to "
<< " segid=" << _next_segid - 1 << ", " << *this;
LOG(INFO) << "src_id=" << src_id << ", segid=" << index << " to "
<< " segid=" << _next_segid - 1 << ", " << *this;
}
}
}
Expand Down Expand Up @@ -190,6 +192,8 @@ Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf* data

int64_t src_id = header.src_id();
uint32_t segid = header.segment_id();
LOG(INFO) << "add_segment src_id=" << src_id << ", tablet_id=" << header.tablet_id()
<< ", segid=" << segid << *this;
uint32_t new_segid;
DBUG_EXECUTE_IF("TabletStream.add_segment.unknown_segid", { segid = UNKNOWN_ID_FOR_TEST; });
{
Expand Down Expand Up @@ -403,6 +407,11 @@ Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>& tablets_t
LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining "
<< _total_streams - _close_load_cnt << " senders, " << *this;

for (const auto& tablet : tablets_to_commit) {
LOG(INFO) << "received tablets commit info from src_id=" << src_id
<< ", load_id=" << print_id(_load_id) << " tablet_id=" << tablet.tablet_id()
<< ", num_segments=" << tablet.num_segments();
}
_tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(),
tablets_to_commit.end());

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/load_stream_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ Status LoadStreamWriter::close_segment(uint32_t segid) {
}
g_load_stream_file_writer_cnt << -1;
LOG(INFO) << "segment " << segid << " path " << file_writer->path().native()
<< "closed, written " << file_writer->bytes_appended() << " bytes";
<< " closed, written " << file_writer->bytes_appended() << " bytes";
if (file_writer->bytes_appended() == 0) {
return Status::Corruption("segment {} closed with 0 bytes", file_writer->path().native());
}
Expand Down
9 changes: 6 additions & 3 deletions be/src/vec/sink/load_stream_map_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,22 @@ Status LoadStreamMap::close_load(bool incremental) {
for (const auto& [tablet_id, tablet] : tablets) {
tablets_to_commit.push_back(tablet);
tablets_to_commit.back().set_num_segments(_segments_for_tablet[tablet_id]);
LOG(INFO) << "reporting num segments, load_id=" << _load_id
<< ", tablet_id=" << tablet_id
<< ", num_segments=" << _segments_for_tablet[tablet_id];
}
bool first = true;
for (auto& stream : streams) {
if (stream->is_incremental() != incremental) {
continue;
}
if (first) {
LOG(INFO) << "sending CLOSE_LOAD with tablets to commit, load_id=" << _load_id
<< ", dst_id=" << stream->dst_id()
<< ", stream_id=" << stream->stream_id();
RETURN_IF_ERROR(stream->close_load(tablets_to_commit));
first = false;
} else {
LOG(INFO) << "sending CLOSE_LOAD without tablets to commit, load_id=" << _load_id
<< ", dst_id=" << stream->dst_id()
<< ", stream_id=" << stream->stream_id();
RETURN_IF_ERROR(stream->close_load({}));
}
}
Expand Down
11 changes: 11 additions & 0 deletions be/src/vec/sink/load_stream_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ Status LoadStreamStub::append_data(int64_t partition_id, int64_t index_id, int64
header.set_segment_eos(segment_eos);
header.set_offset(offset);
header.set_opcode(doris::PStreamHeader::APPEND_DATA);
LOG(INFO) << "append_data, load_id=" << print_id(_load_id)
<< ", tablet_id=" << header.tablet_id() << ", segment_id=" << header.segment_id()
<< ", offset=" << header.offset() << ", dst_id=" << _dst_id
<< ", eos=" << header.segment_eos() << ", stream_id=" << _stream_id;
return _encode_and_send(header, data);
}

Expand All @@ -245,6 +249,9 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64
if (flush_schema != nullptr) {
flush_schema->to_schema_pb(header.mutable_flush_schema());
}
LOG(INFO) << "add_segment, load_id=" << print_id(_load_id)
<< ", tablet_id=" << header.tablet_id() << ", segment_id=" << tablet.segment_id()
<< ", dst_id=" << _dst_id << ", stream_id=" << _stream_id;
return _encode_and_send(header);
}

Expand All @@ -256,6 +263,10 @@ Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commi
header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
for (const auto& tablet : tablets_to_commit) {
*header.add_tablets() = tablet;
LOG(INFO) << "reporting num segments, load_id=" << print_id(_load_id)
<< ", tablet_id=" << tablet.tablet_id()
<< ", num_segments=" << tablet.num_segments()
<< ", dst_id=" << _dst_id << ", stream_id=" << _stream_id;
}
return _encode_and_send(header);
}
Expand Down

0 comments on commit 9695f71

Please sign in to comment.