Skip to content

Commit

Permalink
support disorder sort
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui authored and kaijchen committed Jun 29, 2023
1 parent 7196430 commit 1b59800
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 11 deletions.
12 changes: 6 additions & 6 deletions be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ namespace doris {
using namespace ErrorCode;

Status DeltaWriterV2::open(WriteRequest* req, DeltaWriterV2** writer, RuntimeProfile* profile,
const UniqueId& load_id) {
const UniqueId& load_id) {
*writer = new DeltaWriterV2(req, StorageEngine::instance(), profile, load_id);
return Status::OK();
}

DeltaWriterV2::DeltaWriterV2(WriteRequest* req, StorageEngine* storage_engine, RuntimeProfile* profile,
const UniqueId& load_id)
DeltaWriterV2::DeltaWriterV2(WriteRequest* req, StorageEngine* storage_engine,
RuntimeProfile* profile, const UniqueId& load_id)
: _req(*req),
_tablet(nullptr),
_cur_rowset(nullptr),
Expand Down Expand Up @@ -232,7 +232,7 @@ Status DeltaWriterV2::append(const vectorized::Block* block) {
}

Status DeltaWriterV2::write(const vectorized::Block* block, const std::vector<int>& row_idxs,
bool is_append) {
bool is_append) {
if (UNLIKELY(row_idxs.empty() && !is_append)) {
return Status::OK();
}
Expand Down Expand Up @@ -569,8 +569,8 @@ int64_t DeltaWriterV2::partition_id() const {
}

void DeltaWriterV2::_build_current_tablet_schema(int64_t index_id,
const OlapTableSchemaParam* table_schema_param,
const TabletSchema& ori_tablet_schema) {
const OlapTableSchemaParam* table_schema_param,
const TabletSchema& ori_tablet_schema) {
_tablet_schema->copy_from(ori_tablet_schema);
// find the right index id
int i = 0;
Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/delta_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ namespace vectorized {
class Block;
} // namespace vectorized


// Writer for a particular (load, index, tablet).
// This class is NOT thread-safe, external synchronization is required.
class DeltaWriterV2 {
Expand Down Expand Up @@ -130,7 +129,7 @@ class DeltaWriterV2 {

private:
DeltaWriterV2(WriteRequest* req, StorageEngine* storage_engine, RuntimeProfile* profile,
const UniqueId& load_id);
const UniqueId& load_id);

// push a full memtable to flush executor
Status _flush_memtable_async();
Expand Down
31 changes: 31 additions & 0 deletions be/src/runtime/sink_stream_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ bool TargetSegmentComparator::operator()(const TargetSegmentPtr& lhs,
if (lhs->segmentid != rhs->segmentid) {
return lhs->segmentid < rhs->segmentid;
}
if (lhs->backendid != rhs->backendid) {
return lhs->backendid < rhs->backendid;
}
return false;
}

Expand Down Expand Up @@ -172,6 +175,7 @@ void SinkStreamHandler::_parse_header(butil::IOBuf* const message, PStreamHeader
<< ", schema_hash = " << hdr.tablet_schema_hash();
}

// TODO: delete this method
uint64_t SinkStreamHandler::get_next_segmentid(TargetRowsetPtr target_rowset) {
// TODO: need support concurrent flush memtable
{
Expand All @@ -185,6 +189,33 @@ uint64_t SinkStreamHandler::get_next_segmentid(TargetRowsetPtr target_rowset) {
}
}

uint64_t SinkStreamHandler::get_next_segmentid(TargetRowsetPtr target_rowset, int64_t segmentid,
int64_t backendid) {
// TODO: delete id;
std::lock_guard<std::mutex> l(_tablet_segment_next_id_lock);
TargetSegmentPtr target_segment = std::make_shared<TargetSegment>();
target_segment->target_rowset = target_rowset;
target_segment->segmentid = segmentid;
target_segment->backendid = backendid;

auto it = _tablet_segment_pos.find(target_segment);
if (it != _tablet_segment_pos.end()) {
return it->second;
}
for (int64_t i = 0; i <= segmentid; i++) {
TargetSegmentPtr front_target_segment = std::make_shared<TargetSegment>();
front_target_segment->target_rowset = target_rowset;
front_target_segment->segmentid = i;
front_target_segment->backendid = backendid;
auto it = _tablet_segment_pos.find(front_target_segment);
if (it == _tablet_segment_pos.end()) {
_tablet_segment_pos.emplace(front_target_segment, _current_id);
_current_id++;
}
}
return _tablet_segment_pos.find(target_segment)->second;
}

Status SinkStreamHandler::_build_rowset(TargetRowsetPtr target_rowset,
const RowsetMetaPB& rowset_meta_pb) {
RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
Expand Down
6 changes: 5 additions & 1 deletion be/src/runtime/sink_stream_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ struct TargetRowsetComparator {
struct TargetSegment {
TargetRowsetPtr target_rowset;
int64_t segmentid;
// std::string ip_port or BE id?
int64_t backendid;

std::string to_string();
};
Expand Down Expand Up @@ -79,6 +79,8 @@ class SinkStreamHandler : public StreamInputHandler {
void _report_status(StreamId stream, TargetRowsetPtr target_rowset, bool is_success,
std::string error_msg);
uint64_t get_next_segmentid(TargetRowsetPtr target_rowset);
uint64_t get_next_segmentid(TargetRowsetPtr target_rowset, int64_t segmentid,
int64_t backendid);
Status _build_rowset(TargetRowsetPtr target_rowset, const RowsetMetaPB& rowset_meta);

private:
Expand All @@ -90,6 +92,8 @@ class SinkStreamHandler : public StreamInputHandler {
// TODO: make it per load
std::map<TargetRowsetPtr, size_t, TargetRowsetComparator> _tablet_segment_next_id;
std::mutex _tablet_segment_next_id_lock;
std::map<TargetSegmentPtr, int64_t, TargetSegmentComparator> _tablet_segment_pos;
int64_t _current_id = 0;
// TODO: make it per load
std::map<TargetSegmentPtr, std::shared_ptr<ThreadPoolToken>, TargetSegmentComparator>
_segment_token_map; // accessed in single thread, safe
Expand Down
3 changes: 1 addition & 2 deletions be/test/io/fs/stream_sink_file_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class StreamSinkFileWriterTest : public testing::Test {
};

public:
StreamSinkFileWriterTest() {}
StreamSinkFileWriterTest() { srand(time(nullptr)); }
~StreamSinkFileWriterTest() {}

protected:
Expand All @@ -102,7 +102,6 @@ class StreamSinkFileWriterTest : public testing::Test {
options.max_retry = FLAGS_max_retry;
std::stringstream port;
while (true) {
srand(time(nullptr));
port << "0.0.0.0:" << (rand() % 1000 + 8000);
if (channel.Init(port.str().c_str(), NULL) == 0) {
break;
Expand Down
29 changes: 29 additions & 0 deletions be/test/runtime/sink_stream_mgr_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -400,4 +400,33 @@ TEST_F(SinkStreamMgrTest, open_append_close_file_twice) {
client.disconnect();
}

TEST_F(SinkStreamMgrTest, get_next_segment_id) {
PUniqueId loadid;
loadid.set_hi(1);
loadid.set_lo(1);
RowsetId rowsetid;
rowsetid.init(1);

TargetRowsetPtr target_rowset = std::make_shared<TargetRowset>();
target_rowset->indexid = 1;
target_rowset->loadid = loadid;
target_rowset->tabletid = 1;
target_rowset->rowsetid = rowsetid;

SinkStreamHandler handler;

//test order
CHECK_EQ(0, handler.get_next_segmentid(target_rowset, 0, 1));
CHECK_EQ(1, handler.get_next_segmentid(target_rowset, 1, 1));
// test disorder
CHECK_EQ(4, handler.get_next_segmentid(target_rowset, 4, 1));
CHECK_EQ(2, handler.get_next_segmentid(target_rowset, 2, 1));
// test multiple be concurrent writes
CHECK_EQ(6, handler.get_next_segmentid(target_rowset, 1, 2));
CHECK_EQ(3, handler.get_next_segmentid(target_rowset, 3, 1));
CHECK_EQ(5, handler.get_next_segmentid(target_rowset, 0, 2));
CHECK_EQ(7, handler.get_next_segmentid(target_rowset, 2, 2));
CHECK_EQ(8, handler.get_next_segmentid(target_rowset, 3, 2));
}

} // namespace doris

0 comments on commit 1b59800

Please sign in to comment.