Skip to content

Commit

Permalink
Atomic read/write on DeltaMergeStore level (pingcap#172)
Browse files Browse the repository at this point in the history
* Atomic read/write on DeltaMergeStore level

* Bug fix

* bugfix 2

* bug fix

* fix compile error

* remove useless code

* address comments

* fix compile error
  • Loading branch information
flowbehappy authored and JaySon-Huang committed Oct 30, 2019
1 parent 1a96307 commit a52ca1f
Show file tree
Hide file tree
Showing 24 changed files with 676 additions and 457 deletions.
9 changes: 8 additions & 1 deletion dbms/src/Common/EventRecorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>


/// This class is NOT multi-threads safe!
class EventRecorder
{
public:
Expand All @@ -12,8 +12,14 @@ class EventRecorder
watch.start();
}

~EventRecorder()
{
if (!done) submit();
}

inline void submit()
{
done = true;
ProfileEvents::increment(event);
ProfileEvents::increment(event_elapsed, watch.elapsed());
}
Expand All @@ -23,4 +29,5 @@ class EventRecorder
ProfileEvents::Event event_elapsed;

Stopwatch watch;
bool done = false;
};
10 changes: 8 additions & 2 deletions dbms/src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,14 @@
\
M(DMWriteBlock) \
M(DMWriteBlockNS) \
M(DMAppendDelta) \
M(DMAppendDeltaNS) \
M(DMAppendDeltaPrepare) \
M(DMAppendDeltaPrepareNS) \
M(DMAppendDeltaCommitMemory) \
M(DMAppendDeltaCommitMemoryNS) \
M(DMAppendDeltaCommitDisk) \
M(DMAppendDeltaCommitDiskNS) \
M(DMAppendDeltaCleanUp) \
M(DMAppendDeltaCleanUpNS) \
M(DMPlace) \
M(DMPlaceNS) \
M(DMPlaceUpsert) \
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/Chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ void deserializeColumn(IColumn & column, const ColumnMeta & meta, const Page & p
void readChunkData(MutableColumns & columns,
const Chunk & chunk,
const ColumnDefines & column_defines,
PageStorage & storage,
const PageReader & page_reader,
size_t rows_offset,
size_t rows_limit)
{
Expand Down Expand Up @@ -177,11 +177,11 @@ void readChunkData(MutableColumns & columns,
col.insertRangeFrom(*tmp_col, rows_offset, rows_limit);
}
};
storage.read(page_ids, page_handler);
page_reader.read(page_ids, page_handler);
}


Block readChunk(const Chunk & chunk, const ColumnDefines & read_column_defines, PageStorage & data_storage)
Block readChunk(const Chunk & chunk, const ColumnDefines & read_column_defines, const PageReader & page_reader)
{
if (read_column_defines.empty())
return {};
Expand All @@ -196,7 +196,7 @@ Block readChunk(const Chunk & chunk, const ColumnDefines & read_column_defines,
if (chunk.getRows())
{
// Read from storage
readChunkData(columns, chunk, read_column_defines, data_storage, 0, chunk.getRows());
readChunkData(columns, chunk, read_column_defines, page_reader, 0, chunk.getRows());
}

Block res;
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/Chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class Chunk
size_t rows = 0;
};

// TODO: use list instead of vector, so that DiskValueSpace won't need to do copy during Segment#getReadSnapshot.
using Chunks = std::vector<Chunk>;
using GenPageId = std::function<PageId()>;

Expand All @@ -116,12 +117,12 @@ Chunk prepareChunkDataWrite(const DMContext & dm_context, const GenPageId & gen_
void readChunkData(MutableColumns & columns,
const Chunk & chunk,
const ColumnDefines & column_defines,
PageStorage & storage,
const PageReader & page_reader,
size_t rows_offset,
size_t rows_limit);


Block readChunk(const Chunk & chunk, const ColumnDefines & read_column_defines, PageStorage & data_storage);
Block readChunk(const Chunk & chunk, const ColumnDefines & read_column_defines, const PageReader & page_reader);


} // namespace DM
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/DeltaMerge/ChunkBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ namespace DM
class ChunkBlockInputStream final : public IBlockInputStream
{
public:
ChunkBlockInputStream(const Chunks & chunks_, const ColumnDefines & read_columns_, PageStorage & data_storage_)
: chunks(chunks_), read_columns(read_columns_), data_storage(data_storage_)
ChunkBlockInputStream(const Chunks & chunks_, const ColumnDefines & read_columns_, const PageReader & page_reader_)
: chunks(chunks_), read_columns(read_columns_), page_reader(page_reader_)
{
}

Expand All @@ -36,7 +36,7 @@ class ChunkBlockInputStream final : public IBlockInputStream
{
if (!hasNextBlock())
return {};
return readChunk(chunks[chunk_index++], read_columns, data_storage);
return readChunk(chunks[chunk_index++], read_columns, page_reader);
}

bool hasNextBlock() { return chunk_index < chunks.size(); }
Expand All @@ -48,7 +48,7 @@ class ChunkBlockInputStream final : public IBlockInputStream
Chunks chunks;
size_t chunk_index = 0;
ColumnDefines read_columns;
PageStorage & data_storage;
PageReader page_reader;
Block header;
};

Expand Down
15 changes: 11 additions & 4 deletions dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
public:
/// If handle_real_type_ is empty, means do not convert handle column back to real type.
DMSegmentThreadInputStream(const SegmentReadTaskPoolPtr & task_pool_,
const SegmentStreamCreator & stream_creator_,
const ColumnDefines & columns_to_read_,
const String & handle_name_,
const DataTypePtr & handle_real_type_,
const Context & context_)
: task_pool(task_pool_),
stream_creator(stream_creator_),
columns_to_read(columns_to_read_),
header(createHeader(columns_to_read)),
handle_name(handle_name_),
handle_real_type(handle_real_type_),
context(context_),
log(&Logger::get("SegmentReadTaskPool"))
log(&Logger::get("DMSegmentThreadInputStream"))
{
}

Expand All @@ -38,12 +40,15 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
{
if (!cur_stream)
{
std::tie(cur_segment_id, cur_stream) = task_pool->nextTask();
if (!cur_stream) // we are done.
auto task = task_pool->nextTask();
if (!task)
{
done = true;
return {};
}

cur_segment_id = task->segment->segmentId();
cur_stream = stream_creator(*task);
LOG_DEBUG(log, "Start to read segment [" + DB::toString(cur_segment_id) + "]");
}

Expand All @@ -57,7 +62,8 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream
}
else
{
cur_stream = {};
cur_segment_id = 0;
cur_stream = {};
LOG_DEBUG(log, "Finish reading segment [" + DB::toString(cur_segment_id) + "]");
}
}
Expand All @@ -80,6 +86,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream

private:
SegmentReadTaskPoolPtr task_pool;
SegmentStreamCreator stream_creator;
ColumnDefines columns_to_read;
Block header;
String handle_name;
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
ssize_t stable_skip = 0;

DeltaValueSpacePtr delta_value_space;
size_t delta_rows_limit;
IndexIterator entry_it;
IndexIterator entry_end;

Expand Down Expand Up @@ -71,6 +72,7 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
stable_input_stream(stable_input_stream_),
stable_input_stream_raw_ptr(stable_input_stream.get()),
delta_value_space(delta_value_space_),
delta_rows_limit(delta_value_space->getRows()),
entry_it(index_begin),
entry_end(index_end),
max_block_size(max_block_size_)
Expand Down Expand Up @@ -159,8 +161,11 @@ class DeltaMergeBlockInputStream final : public IProfilingBlockInputStream
}
else if (entry_it.getType() == DT_INS)
{
writeInsertFromDelta(output_columns, entry_it.getValue());
--output_write_limit;
if (entry_it.getValue() < delta_rows_limit)
{
writeInsertFromDelta(output_columns, entry_it.getValue());
--output_write_limit;
}
}
else
{
Expand Down
Loading

0 comments on commit a52ca1f

Please sign in to comment.