Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLASH-296] [DM] Use Background Threads To Run Delta Merge Operations #266

Merged
merged 8 commits into from
Oct 19, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dbms/src/Common/Allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ void * Allocator<clear_memory_>::realloc(void * buf, size_t old_size, size_t new
buf = ::realloc(buf, new_size);

if (nullptr == buf)
DB::throwFromErrno("Allocator: Cannot realloc from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
DB::throwFromErrno("Allocator: Cannot realloc from " + formatReadableSizeWithBinarySuffix(old_size) + DB::toString(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + DB::toString(new_size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);

if (clear_memory && new_size > old_size)
memset(reinterpret_cast<char *>(buf) + old_size, 0, new_size - old_size);
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Interpreters/InterpreterManageQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ BlockIO InterpreterManageQuery::execute()
manageable_storage->checkStatus(context);
return {};
}
case ManageOperation::Enum ::DeleteRows:
{
manageable_storage->deleteRows(context, ast.rows);
return {};
}
}
return {};
}
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Parsers/ASTManageQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ namespace ManageOperation
Flush,
Status,
Check,
DeleteRows,
};

inline const char * toString(UInt64 op)
{
static const char * data[] = {"Flush", "Status", "Check"};
return op < 3 ? data[op] : "Unknown operation";
static const char * data[] = {"Flush", "Status", "Check", "Delete Rows"};
return op < 4 ? data[op] : "Unknown operation";
}
}

Expand All @@ -31,6 +32,8 @@ class ASTManageQuery : public IAST

ManageOperation::Enum operation;

size_t rows = 0;

/** Get the text that identifies this element. */
String getID() const override
{
Expand All @@ -51,6 +54,8 @@ class ASTManageQuery : public IAST
<< (!database.empty() ? backQuoteIfNeed(database) + "." : "") << backQuoteIfNeed(table) << " "
<< (settings.hilite ? hilite_keyword : "") << ManageOperation::toString(operation)
<< (settings.hilite ? hilite_none : "");
if(operation == ManageOperation::Enum::DeleteRows)
settings.ostr << " " << rows;
}
};
}
3 changes: 2 additions & 1 deletion dbms/src/Parsers/ParserInsertQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_values("VALUES");
ParserKeyword s_format("FORMAT");
ParserKeyword s_select("SELECT");
ParserKeyword s_selraw("SELRAW");
ParserKeyword s_with("WITH");
ParserToken s_lparen(TokenType::OpeningRoundBracket);
ParserToken s_rparen(TokenType::ClosingRoundBracket);
Expand Down Expand Up @@ -137,7 +138,7 @@ bool ParserInsertQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (data < end && *data == '\n')
++data;
}
else if (s_select.ignore(pos, expected) || s_with.ignore(pos,expected))
else if (s_select.ignore(pos, expected) || s_selraw.ignore(pos, expected) || s_with.ignore(pos,expected))
{
pos = before_select;
ParserSelectWithUnionQuery select_p;
Expand Down
28 changes: 19 additions & 9 deletions dbms/src/Parsers/ParserManageQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ bool ParserManageQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_flush("FLUSH");
ParserKeyword s_status("STATUS");
ParserKeyword s_check("CHECK");
ParserKeyword s_delete_rows("DELETE ROWS");

ParserToken s_dot(TokenType::Dot);
ParserIdentifier name_p;
Expand All @@ -37,25 +38,34 @@ bool ParserManageQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return false;
}

auto query = std::make_shared<ASTManageQuery>();
node = query;

if (database)
query->database = typeid_cast<const ASTIdentifier &>(*database).name;
if (table)
query->table = typeid_cast<const ASTIdentifier &>(*table).name;

if (s_flush.ignore(pos, expected))
operation = ManageOperation::Enum::Flush;
else if (s_status.ignore(pos, expected))
operation = ManageOperation::Enum::Status;
else if (s_check.ignore(pos, expected))
operation = ManageOperation::Enum::Check;
else if (s_delete_rows.ignore(pos, expected))
{
operation = ManageOperation::Enum::DeleteRows;
ParserNumber num;
ASTPtr rows;
if (!num.parse(pos, rows, expected))
return false;
query->rows = safeGet<UInt64>(typeid_cast<ASTLiteral &>(*rows).value);
}
else
return false;

auto query = std::make_shared<ASTManageQuery>();
node = query;

if (database)
query->database = typeid_cast<const ASTIdentifier &>(*database).name;
if (table)
query->table = typeid_cast<const ASTIdentifier &>(*table).name;

query->operation = operation;

return true;
}
}
} // namespace DB
11 changes: 11 additions & 0 deletions dbms/src/Storages/DeltaMerge/Chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,17 @@ void serializeChunks(
extra2->serialize(buf);
}

void serializeChunks(WriteBuffer & buf, Chunks::const_iterator begin, Chunks ::const_iterator end, const Chunks & extra_chunks)
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
{
auto size = (UInt64)(end - begin) + extra_chunks.size();
writeIntBinary(size, buf);

for (; begin != end; ++begin)
(*begin).serialize(buf);
for (auto & chunk : extra_chunks)
chunk.serialize(buf);
}

Chunks deserializeChunks(ReadBuffer & buf)
{
Chunks chunks;
Expand Down
16 changes: 11 additions & 5 deletions dbms/src/Storages/DeltaMerge/Chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class Chunk
// Binary version of chunk
using Version = UInt32;
static const Version CURRENT_VERSION;

public:
using ColumnMetaMap = std::unordered_map<ColId, ColumnMeta>;

Expand Down Expand Up @@ -122,11 +123,16 @@ using GenPageId = std::function<PageId()>;
Chunk createRefChunk(const Chunk & chunk, const GenPageId & gen_data_page_id, WriteBatch & wb);
Chunks createRefChunks(const Chunks & chunks, const GenPageId & gen_data_page_id, WriteBatch & wb);

void serializeChunks(WriteBuffer & buf,
Chunks::const_iterator begin,
Chunks ::const_iterator end,
const Chunk * extra1 = nullptr,
const Chunk * extra2 = nullptr);
void serializeChunks(WriteBuffer & buf,
Chunks::const_iterator begin,
Chunks ::const_iterator end,
const Chunk * extra1 = nullptr,
const Chunk * extra2 = nullptr);
void serializeChunks(WriteBuffer & buf, //
Chunks::const_iterator begin,
Chunks ::const_iterator end,
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
const Chunks & extr_chunks);

Chunks deserializeChunks(ReadBuffer & buf);

Chunk prepareChunkDataWrite(const DMContext & dm_context, const GenPageId & gen_data_page_id, WriteBatch & wb, const Block & block);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class ChunkBlockInputStream final : public IBlockInputStream
const ColumnDefines & read_columns_,
const PageReader & page_reader_,
const RSOperatorPtr & filter)
: chunks(chunks_), skip_chunks(chunks.size(), 0), read_columns(read_columns_), page_reader(page_reader_)
: chunks(std::move(chunks_)), skip_chunks(chunks.size(), 0), read_columns(read_columns_), page_reader(page_reader_)
{
if (filter)
{
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ struct DMContext

const NotCompress & not_compress;

// The rows of segment.
const size_t segment_rows;
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved

// The threshold of delta.
const size_t delta_limit_rows;
const size_t delta_limit_bytes;
Expand Down
18 changes: 11 additions & 7 deletions dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
/// If handle_real_type_ is empty, means do not convert handle column back to real type.
DMSegmentThreadInputStream(const SegmentReadTaskPoolPtr & task_pool_,
const SegmentStreamCreator & stream_creator_,
const AfterSegmentRead & after_segment_read_,
const ColumnDefines & columns_to_read_)
: task_pool(task_pool_),
stream_creator(stream_creator_),
after_segment_read(after_segment_read_),
columns_to_read(columns_to_read_),
header(toEmptyBlock(columns_to_read)),
log(&Logger::get("DMSegmentThreadInputStream"))
Expand All @@ -41,9 +43,9 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
return {};
}

cur_segment_id = task->segment->segmentId();
cur_stream = stream_creator(*task);
LOG_TRACE(log, "Start to read segment [" + DB::toString(cur_segment_id) + "]");
cur_segment = task->segment;
cur_stream = stream_creator(*task);
LOG_TRACE(log, "Start to read segment [" + DB::toString(cur_segment->segmentId()) + "]");
}

Block res = cur_stream->read();
Expand All @@ -56,9 +58,10 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
}
else
{
LOG_TRACE(log, "Finish reading segment [" + DB::toString(cur_segment_id) + "]");
cur_segment_id = 0;
cur_stream = {};
after_segment_read(cur_segment);
LOG_TRACE(log, "Finish reading segment [" + DB::toString(cur_segment->segmentId()) + "]");
cur_segment = {};
cur_stream = {};
}
}
}
Expand All @@ -74,12 +77,13 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
private:
SegmentReadTaskPoolPtr task_pool;
SegmentStreamCreator stream_creator;
AfterSegmentRead after_segment_read;
ColumnDefines columns_to_read;
Block header;

bool done = false;
BlockInputStreamPtr cur_stream;
UInt64 cur_segment_id;
SegmentPtr cur_segment;

Logger * log;
};
Expand Down
18 changes: 9 additions & 9 deletions dbms/src/Storages/DeltaMerge/DeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,14 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream

if constexpr (!c_delta_done)
{
auto tuple_id = entry_it.getValue();
auto value = entry_it.getValue();
switch (entry_it.getType())
{
case DT_DEL:
writeDeleteFromDelta(1);
writeDeleteFromDelta(value);
break;
case DT_INS:
writeInsertFromDelta(output_columns, tuple_id);
writeInsertFromDelta(output_columns, value);
--output_write_limit;
break;
default:
Expand Down Expand Up @@ -242,15 +242,15 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
if (!stable_input_stream_raw_ptr->hasNext())
throw Exception("Unexpected end of block, need more rows to skip");

size_t rows = stable_input_stream_raw_ptr->nextRows();
if (!stable_input_stream_raw_ptr->shouldSkipNext())
ssize_t rows = stable_input_stream_raw_ptr->nextRows();
if (stable_skip > rows || stable_input_stream_raw_ptr->shouldSkipNext())
{
fillStableBlockIfNeeded();
stable_input_stream_raw_ptr->skipNext();
stable_skip -= rows;
}
else
{
stable_input_stream_raw_ptr->skipNext();
stable_skip -= rows;
fillStableBlockIfNeeded();
}
}

Expand Down Expand Up @@ -328,7 +328,7 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
{
auto prev_sid = entry_it.getSid();
if (entry_it.getType() == DT_DEL)
prev_sid += 1;
prev_sid += entry_it.getValue();

++entry_it;

Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ using ColumnMap = std::unordered_map<ColId, ColumnPtr>;
using MutableColumnMap = std::unordered_map<ColId, MutableColumnPtr>;
using LockGuard = std::lock_guard<std::mutex>;

static const UInt64 INITIAL_EPOCH = 5; // Following TiDB, and I have no idea why 5 is chosen.
static const UInt64 INITIAL_EPOCH = 0;

// TODO maybe we should use those variables instead of macros?
#define EXTRA_HANDLE_COLUMN_NAME ::DB::MutableSupport::tidb_pk_column_name
Expand Down Expand Up @@ -129,8 +129,7 @@ static constexpr Handle P_INF_HANDLE = MAX_INT64; // Used in range, indicating p
static_assert(static_cast<Int64>(static_cast<UInt64>(MIN_INT64)) == MIN_INT64, "Unsupported compiler!");
static_assert(static_cast<Int64>(static_cast<UInt64>(MAX_INT64)) == MAX_INT64, "Unsupported compiler!");

static constexpr UInt64 DEL_RANGE_POS_MARK = (1ULL << 63);

static constexpr bool DM_RUN_CHECK = true;

} // namespace DM
} // namespace DB
7 changes: 2 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore-internal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ DeltaMergeStore::WriteActions prepareWriteActions(const Block &
: std::lower_bound(handle_data.cbegin() + offset, handle_data.cend(), range.end);
size_t limit = end_pos - (handle_data.cbegin() + offset);

actions.emplace_back(DeltaMergeStore::WriteAction{.segment = segment, .offset = offset, .limit = limit});
actions.emplace_back(segment, offset, limit);

offset += limit;
}
Expand All @@ -54,10 +54,7 @@ DeltaMergeStore::WriteActions prepareWriteActions(const HandleRange &
{
(void)handle_;
if (segment->getRange().intersect(delete_range))
{
// TODO maybe more precise on `action.update`
actions.emplace_back(DeltaMergeStore::WriteAction{.segment = segment, .offset = 0, .limit = 0, .update = delete_range});
}
actions.emplace_back(segment, delete_range);
}

return actions;
Expand Down
Loading