Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
sollhui authored and kaijchen committed Aug 1, 2023
1 parent 53a7070 commit f314acd
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 21 deletions.
22 changes: 9 additions & 13 deletions be/test/io/fs/stream_sink_file_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "gtest/gtest_pred_impl.h"
#include "olap/olap_common.h"
#include "util/debug/leakcheck_disabler.h"
#include "util/faststring.h"

namespace doris {
Expand Down Expand Up @@ -59,10 +60,9 @@ class StreamSinkFileWriterTest : public testing::Test {
virtual void on_closed(brpc::StreamId id) { LOG(INFO) << "Stream=" << id << " is closed"; }
};

class StreamingSinkFileService : public PBackendService_Stub {
class StreamingSinkFileService : public PBackendService {
public:
StreamingSinkFileService(brpc::Channel* channel)
: PBackendService_Stub(channel), _sd(brpc::INVALID_STREAM_ID) {}
StreamingSinkFileService() : _sd(brpc::INVALID_STREAM_ID) {}
virtual ~StreamingSinkFileService() { brpc::StreamClose(_sd); };
virtual void open_stream_sink(google::protobuf::RpcController* controller,
const POpenStreamSinkRequest*,
Expand Down Expand Up @@ -101,20 +101,17 @@ class StreamSinkFileWriterTest : public testing::Test {
options.timeout_ms = FLAGS_timeout_ms;
options.max_retry = FLAGS_max_retry;
std::stringstream port;
while (true) {
port << "0.0.0.0:" << (rand() % 1000 + 8000);
if (channel.Init(port.str().c_str(), NULL) == 0) {
break;
}
port.clear();
}
CHECK_EQ(0, channel.Init("127.0.0.1:18946", nullptr));

// init server
_stream_service = new StreamingSinkFileService(&channel);
_stream_service = new StreamingSinkFileService();
CHECK_EQ(0, _server.AddService(_stream_service, brpc::SERVER_DOESNT_OWN_SERVICE));
brpc::ServerOptions server_options;
server_options.idle_timeout_sec = FLAGS_idle_timeout_s;
CHECK_EQ(0, _server.Start(port.str().c_str(), &server_options));
{
debug::ScopedLeakCheckDisabler disable_lsan;
CHECK_EQ(0, _server.Start("127.0.0.1:18946", &server_options));
}

// init stream connect
PBackendService_Stub stub(&channel);
Expand Down Expand Up @@ -176,5 +173,4 @@ TEST_F(StreamSinkFileWriterTest, TestFinalize) {
writer.init(load_id, 3, 4, 5, 6);
CHECK_STATUS_OK(writer.finalize());
}

} // namespace doris
84 changes: 76 additions & 8 deletions be/test/runtime/load_stream_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@
#include "gen_cpp/BackendService_types.h"
#include "gen_cpp/FrontendService_types.h"
#include "gtest/gtest_pred_impl.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/tablet_manager.h"
#include "olap/txn_manager.h"
#include "runtime/descriptor_helper.h"
#include "runtime/exec_env.h"
#include "runtime/load_stream_mgr.h"
#include "util/debug/leakcheck_disabler.h"

using namespace brpc;

Expand Down Expand Up @@ -334,6 +336,68 @@ class LoadStreamMgrTest : public testing::Test {
void on_closed(StreamId id) override { std::cerr << "on_closed" << std::endl; }
};

class StreamService : public PBackendService {
public:
StreamService() : _sd(brpc::INVALID_STREAM_ID) {}
virtual ~StreamService() { brpc::StreamClose(_sd); };
virtual void open_stream_sink(google::protobuf::RpcController* controller,
const POpenStreamSinkRequest* request,
POpenStreamSinkResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
std::unique_ptr<PStatus> status = std::make_unique<PStatus>();
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
brpc::StreamOptions stream_options;

for (const auto& req : request->tablets()) {
TabletManager* tablet_mgr = StorageEngine::instance()->tablet_manager();
TabletSharedPtr tablet = tablet_mgr->get_tablet(req.tablet_id());
if (tablet == nullptr) {
cntl->SetFailed("Tablet not found");
status->set_status_code(TStatusCode::NOT_FOUND);
response->set_allocated_status(status.get());
response->release_status();
return;
}
auto resp = response->add_tablet_schemas();
resp->set_index_id(req.index_id());
resp->set_enable_unique_key_merge_on_write(
tablet->enable_unique_key_merge_on_write());
tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema());
}

ExecEnv* env = ExecEnv::GetInstance();

auto load_stream_mgr = env->get_load_stream_mgr();
LoadStreamSharedPtr load_stream;
auto st = load_stream_mgr->try_open_load_stream(request, &load_stream);

stream_options.handler = load_stream.get();

StreamId streamid;
if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) {
cntl->SetFailed("Fail to accept stream");
status->set_status_code(TStatusCode::CANCELLED);
response->set_allocated_status(status.get());
response->release_status();
return;
}

load_stream->add_rpc_stream();
LOG(INFO) << "OOXXOO: get streamid =" << streamid;

status->set_status_code(TStatusCode::OK);
response->set_allocated_status(status.get());
response->release_status();
}

brpc::StreamId get_stream() const { return _sd; }

private:
Handler _receiver;
brpc::StreamId _sd;
};

class MockSinkClient {
public:
MockSinkClient() = default;
Expand Down Expand Up @@ -494,6 +558,7 @@ class LoadStreamMgrTest : public testing::Test {
}

void SetUp() override {
_server = new brpc::Server();
srand(time(nullptr));
char buffer[MAX_PATH_LEN];
EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
Expand All @@ -519,11 +584,14 @@ class LoadStreamMgrTest : public testing::Test {

z_engine->start_bg_threads();

_internal_service = new PInternalServiceImpl(_env);
CHECK_EQ(0, _server.AddService(_internal_service, brpc::SERVER_DOESNT_OWN_SERVICE));
_stream_service = new StreamService();
CHECK_EQ(0, _server->AddService(_stream_service, brpc::SERVER_OWNS_SERVICE));
brpc::ServerOptions server_options;
server_options.idle_timeout_sec = 300;
CHECK_EQ(0, _server.Start("127.0.0.1:18947", &server_options)); // TODO: make port random
{
debug::ScopedLeakCheckDisabler disable_lsan;
CHECK_EQ(0, _server->Start("127.0.0.1:18947", &server_options));
}

for (int i = 0; i < 3; i++) {
TCreateTabletReq request;
Expand All @@ -540,9 +608,9 @@ class LoadStreamMgrTest : public testing::Test {
z_engine = nullptr;
}
delete _env->_load_stream_mgr;
_server.Stop(1000);
CHECK_EQ(0, _server.Join());
delete _internal_service;
_server->Stop(1000);
CHECK_EQ(0, _server->Join());
SAFE_DELETE(_server);
}

std::string read_data(int64_t txn_id, int64_t partition_id, int64_t tablet_id, uint32_t segid) {
Expand Down Expand Up @@ -572,8 +640,8 @@ class LoadStreamMgrTest : public testing::Test {
}

ExecEnv* _env;
brpc::Server _server;
PInternalServiceImpl* _internal_service;
brpc::Server* _server;
StreamService* _stream_service;
};

// <client, index, bucket>
Expand Down

0 comments on commit f314acd

Please sign in to comment.