Skip to content

Commit

Permalink
Fix minor compaction after restart (#6192)
Browse files Browse the repository at this point in the history
close #6159
  • Loading branch information
lidezhu authored Oct 27, 2022
1 parent dc75250 commit deb4c9d
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ ColumnFilePersisteds deserializeSavedColumnFiles(DMContext & context, const RowK
column_files = deserializeSavedColumnFilesInV2Format(buf, version);
break;
case DeltaFormat::V3:
column_files = deserializeSavedColumnFilesInV3Format(context, segment_range, buf, version);
column_files = deserializeSavedColumnFilesInV3Format(context, segment_range, buf);
break;
default:
throw Exception("Unexpected delta value version: " + DB::toString(version) + ", latest version: " + DB::toString(DeltaFormat::V3),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void serializeSavedColumnFilesInV2Format(WriteBuffer & buf, const ColumnFilePers
ColumnFilePersisteds deserializeSavedColumnFilesInV2Format(ReadBuffer & buf, UInt64 version);

void serializeSavedColumnFilesInV3Format(WriteBuffer & buf, const ColumnFilePersisteds & column_files);
ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf, UInt64 version);
ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf);

} // namespace DM
} // namespace DB
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile_V3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ void serializeSavedColumnFilesInV3Format(WriteBuffer & buf, const ColumnFilePers
throw Exception("A tiny file without schema: " + column_file->toString(), ErrorCodes::LOGICAL_ERROR);

bool save_schema = cur_schema != last_schema;
last_schema = cur_schema;
column_file->serializeMetadata(buf, save_schema);
break;
}
Expand All @@ -60,7 +61,7 @@ void serializeSavedColumnFilesInV3Format(WriteBuffer & buf, const ColumnFilePers
}
}

ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf, UInt64 /*version*/)
ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf)
{
size_t column_file_count;
readIntBinary(column_file_count, buf);
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ class MemTableSet : public std::enable_shared_from_this<MemTableSet>
rows += file->getRows();
bytes += file->getBytes();
deletes += file->getDeletes();
if (auto * m_file = file->tryToInMemoryFile(); m_file)
{
last_schema = m_file->getSchema();
}
else if (auto * t_file = file->tryToTinyFile(); t_file)
{
last_schema = t_file->getSchema();
}
}
}

Expand Down
30 changes: 30 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileBig.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileDeleteRange.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
Expand Down Expand Up @@ -161,6 +163,34 @@ try
}
CATCH

TEST_F(ColumnFileTest, SerializeColumnFilePersisted)
try
{
WriteBatches wbs(dmContext().storage_pool, dmContext().getWriteLimiter());
MemoryWriteBuffer buff;
{
ColumnFilePersisteds column_file_persisteds;
size_t rows = 100; // arbitrary value
auto block = DMTestEnv::prepareSimpleWriteBlock(0, rows, false);
auto schema = std::make_shared<Block>(block.cloneEmpty());
column_file_persisteds.push_back(ColumnFileTiny::writeColumnFile(dmContext(), block, 0, rows, wbs, schema));
column_file_persisteds.emplace_back(std::make_shared<ColumnFileDeleteRange>(RowKeyRange::newAll(false, 1)));
column_file_persisteds.push_back(ColumnFileTiny::writeColumnFile(dmContext(), block, 0, rows, wbs, schema));
column_file_persisteds.emplace_back(std::make_shared<ColumnFileDeleteRange>(RowKeyRange::newAll(false, 1)));
column_file_persisteds.push_back(ColumnFileTiny::writeColumnFile(dmContext(), block, 0, rows, wbs, schema));
serializeSavedColumnFilesInV3Format(buff, column_file_persisteds);
}

{
auto read_buff = buff.tryGetReadBuffer();
auto column_file_persisteds = deserializeSavedColumnFilesInV3Format(dmContext(), RowKeyRange::newAll(false, 1), *read_buff);
ASSERT_EQ(column_file_persisteds.size(), 5);
ASSERT_EQ(column_file_persisteds[0]->tryToTinyFile()->getSchema(), column_file_persisteds[2]->tryToTinyFile()->getSchema());
ASSERT_EQ(column_file_persisteds[2]->tryToTinyFile()->getSchema(), column_file_persisteds[4]->tryToTinyFile()->getSchema());
}
}
CATCH

} // namespace tests
} // namespace DM
} // namespace DB
55 changes: 55 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,61 @@ try
}
CATCH

TEST_F(SegmentOperationTest, CheckColumnFileSchema)
try
{
writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID);
mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID);

{
LOG_DEBUG(log, "beginSegmentMergeDelta");

// Start a segment merge and suspend it before applyMerge
auto sp_seg_merge_delta_apply = SyncPointCtl::enableInScope("before_Segment::applyMergeDelta");
auto th_seg_merge_delta = std::async([&]() {
mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID, /* check_rows */ false);
});
sp_seg_merge_delta_apply.waitAndPause();

LOG_DEBUG(log, "pausedBeforeApplyMergeDelta");

// non-flushed column files
writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
sp_seg_merge_delta_apply.next();
th_seg_merge_delta.get();

LOG_DEBUG(log, "finishApplyMergeDelta");
}

{
ingestDTFileIntoSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
}
ASSERT_EQ(segments.size(), 1);
{
auto segment = segments[DELTA_MERGE_FIRST_SEGMENT_ID];
auto delta = segment->getDelta();
auto mem_table_set = delta->getMemTableSet();
WriteBatches wbs(dm_context->storage_pool);
auto column_files = mem_table_set->cloneColumnFiles(*dm_context, segment->getRowKeyRange(), wbs);
ASSERT_FALSE(column_files.empty());
BlockPtr last_schema;
for (const auto & column_file : column_files)
{
if (auto * t_file = column_file->tryToTinyFile(); t_file)
{
auto current_schema = t_file->getSchema();
ASSERT_TRUE(!last_schema || (last_schema == current_schema));
last_schema = current_schema;
}
}
// check last_schema is not nullptr after all
ASSERT_NE(last_schema, nullptr);
}
}
CATCH


TEST_F(SegmentOperationTest, SegmentLogicalSplit)
try
Expand Down

0 comments on commit deb4c9d

Please sign in to comment.