Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix machine hostname is resolved to loopback address #34

Merged
merged 9 commits into from
Aug 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "olap/utils.h"
#include "common/resource_tls.h"
#include "agent/cgroups_mgr.h"
#include "service/backend_options.h"

using std::deque;
using std::list;
Expand Down Expand Up @@ -69,7 +70,7 @@ TaskWorkerPool::TaskWorkerPool(
_agent_utils = new AgentUtils();
_master_client = new MasterServerClient(_master_info, &_master_service_client_cache);
_command_executor = new CommandExecutor();
_backend.__set_host(_agent_utils->get_local_ip());
_backend.__set_host(BackendOptions::get_localhost());
_backend.__set_be_port(config::be_port);
_backend.__set_http_port(config::webserver_port);
}
Expand Down
19 changes: 0 additions & 19 deletions be/src/agent/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,25 +260,6 @@ AgentStatus AgentUtils::rsync_from_remote(
return PALO_SUCCESS;
}

char* AgentUtils::get_local_ip() {
char hname[128];
gethostname(hname, sizeof(hname));

// Let's hope this is not broken in the glibc we're using
struct hostent hent;
struct hostent *he = 0;
char hbuf[2048];
int err = 0;
if (gethostbyname_r(hname, &hent, hbuf, sizeof(hbuf), &he, &err) != 0
|| he == 0) {
LOG(ERROR) << "gethostbyname : " << hname << ", "
<< "error: " << err;
return NULL;
}

return inet_ntoa(*(struct in_addr*)(he->h_addr_list[0]));
}

std::string AgentUtils::print_agent_status(AgentStatus status) {
switch (status) {
case PALO_SUCCESS:
Expand Down
3 changes: 0 additions & 3 deletions be/src/agent/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,6 @@ class AgentUtils {
const uint32_t transport_speed_limit_kbps,
const uint32_t timeout_second);

// Get ip of local service
virtual char* get_local_ip();

// Print AgentStatus as string
virtual std::string print_agent_status(AgentStatus status);

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/analytic_eval_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include "exprs/agg_fn_evaluator.h"
#include "exprs/anyval_util.h"

#include "runtime/buffered_tuple_stream.hpp"
#include "runtime/buffered_tuple_stream.h"
#include "runtime/descriptors.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "olap_scan_node.h"
#include "olap_utils.h"
#include "olap/olap_reader.h"
#include "service/backend_options.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "runtime/mem_pool.h"
Expand Down Expand Up @@ -135,8 +136,7 @@ Status OlapScanner::open() {
fetch_request.__set_aggregation(_aggregation);

if (!_reader->init(fetch_request, &_vec_conjunct_ctxs, _profile).ok()) {
std::string local_ip;
get_local_ip(&local_ip);
std::string local_ip = BackendOptions::get_localhost();
std::stringstream ss;
if (MemTracker::limit_exceeded(*_runtime_state->mem_trackers())) {
ss << "Memory limit exceeded. Tablet: " << fetch_request.tablet_id << ". host: " << local_ip;
Expand Down
3 changes: 2 additions & 1 deletion be/src/http/action/mini_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "http/http_channel.h"
#include "http/http_parser.h"
#include "olap/file_helper.h"
#include "service/backend_options.h"
#include "util/url_coding.h"
#include "util/file_utils.h"
#include "runtime/exec_env.h"
Expand Down Expand Up @@ -328,7 +329,7 @@ Status MiniLoadAction::load(
}
req.__set_properties(params);
req.files.push_back(file_path);
req.backend.__set_hostname(*_exec_env->local_ip());
req.backend.__set_hostname(BackendOptions::get_localhost());
req.backend.__set_port(config::be_port);

struct timeval tv;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/runtime")
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/runtime")

add_library(Runtime STATIC
broker_mgr.cpp
broker_mgr.cpp
buffered_block_mgr.cpp
buffered_tuple_stream.cpp
buffered_tuple_stream_ir.cpp
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/broker_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "common/config.h"
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/TPaloBrokerService.h"
#include "service/backend_options.h"
#include "runtime/exec_env.h"
#include "runtime/client_cache.h"
#include "util/thrift_util.h"
Expand All @@ -37,7 +38,7 @@ BrokerMgr::~BrokerMgr() {

void BrokerMgr::init() {
std::stringstream ss;
ss << *_exec_env->local_ip() << ":" << config::be_port;
ss << BackendOptions::get_localhost() << ":" << config::be_port;
_client_id = ss.str();
}

Expand Down
52 changes: 52 additions & 0 deletions be/src/runtime/buffered_tuple_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,4 +439,56 @@ int BufferedTupleStream::compute_row_size(TupleRow* row) const {

return size;
}

inline uint8_t* BufferedTupleStream::allocate_row(int size) {
DCHECK(!_closed);

if (UNLIKELY(_write_block == NULL || _write_block->bytes_remaining() < size)) {
bool got_block = false;
_status = new_block_for_write(size, &got_block);

if (!_status.ok() || !got_block) {
return NULL;
}
}

DCHECK(_write_block != NULL);
// DCHECK(_write_block->is_pinned());
DCHECK_GE(_write_block->bytes_remaining(), size);
++_num_rows;
_write_block->add_row();
return _write_block->allocate<uint8_t>(size);
}

inline void BufferedTupleStream::get_tuple_row(const RowIdx& idx, TupleRow* row) const {
DCHECK(!_closed);
//DCHECK(is_pinned());
DCHECK(!_delete_on_read);
DCHECK_EQ(_blocks.size(), _block_start_idx.size());
DCHECK_LT(idx.block(), _blocks.size());

uint8_t* data = _block_start_idx[idx.block()] + idx.offset();

if (_nullable_tuple) {
// Stitch together the tuples from the block and the NULL ones.
const int tuples_per_row = _desc.tuple_descriptors().size();
uint32_t tuple_idx = idx.idx() * tuples_per_row;

for (int i = 0; i < tuples_per_row; ++i) {
const uint8_t* null_word = _block_start_idx[idx.block()] + (tuple_idx >> 3);
const uint32_t null_pos = tuple_idx & 7;
const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0);
row->set_tuple(i, reinterpret_cast<Tuple*>(
reinterpret_cast<uint64_t>(data) * is_not_null));
data += _desc.tuple_descriptors()[i]->byte_size() * is_not_null;
++tuple_idx;
}
} else {
for (int i = 0; i < _desc.tuple_descriptors().size(); ++i) {
row->set_tuple(i, reinterpret_cast<Tuple*>(data));
data += _desc.tuple_descriptors()[i]->byte_size();
}
}
}

}
17 changes: 17 additions & 0 deletions be/src/runtime/buffered_tuple_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,23 @@ class BufferedTupleStream {
int compute_num_null_indicator_bytes(int block_size) const;
};

inline bool BufferedTupleStream::add_row(TupleRow* row, uint8_t** dst) {
DCHECK(!_closed);

if (LIKELY(deep_copy(row, dst))) {
return true;
}

bool got_block = false;
_status = new_block_for_write(compute_row_size(row), &got_block);

if (!_status.ok() || !got_block) {
return false;
}

return deep_copy(row, dst);
}

}

#endif
101 changes: 0 additions & 101 deletions be/src/runtime/buffered_tuple_stream.hpp

This file was deleted.

2 changes: 1 addition & 1 deletion be/src/runtime/buffered_tuple_stream_ir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// specific language governing permissions and limitations
// under the License.

#include "runtime/buffered_tuple_stream.hpp"
#include "runtime/buffered_tuple_stream.h"

#include "runtime/descriptors.h"
#include "runtime/tuple_row.h"
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/etl_job_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "gen_cpp/Status_types.h"
#include "gen_cpp/Types_types.h"
#include "service/backend_options.h"
#include "util/debug_util.h"
#include "runtime/exec_env.h"
#include "runtime/plan_fragment_executor.h"
Expand All @@ -37,7 +38,7 @@ namespace palo {

std::string EtlJobMgr::to_http_path(const std::string& file_name) {
std::stringstream url;
url << "http://" << *_exec_env->local_ip() << ":" << config::webserver_port
url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
<< "/api/_download_load?file=" << file_name;
return url.str();
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ ExecEnv::ExecEnv() :
_fragment_mgr(new FragmentMgr(this)),
_master_info(new TMasterInfo()),
_etl_job_mgr(new EtlJobMgr(this)),
_local_ip(new std::string()),
_load_path_mgr(new LoadPathMgr()),
_disk_io_mgr(new DiskIoMgr()),
_tmp_file_mgr(new TmpFileMgr),
Expand All @@ -96,7 +95,6 @@ ExecEnv::ExecEnv() :
_broker_mgr(new BrokerMgr(this)),
_enable_webserver(true),
_tz_database(TimezoneDatabase()) {
get_local_ip(_local_ip.get());
_client_cache->init_metrics(_metrics.get(), "palo.backends");
//_frontend_client_cache->init_metrics(_metrics.get(), "frontend-server.backends");
_result_mgr->init();
Expand Down
5 changes: 0 additions & 5 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,6 @@ class ExecEnv {
return _tmp_file_mgr.get();
}

std::string* local_ip() {
return _local_ip.get();
}

BfdParser* bfd_parser() const {
return _bfd_parser.get();
}
Expand Down Expand Up @@ -184,7 +180,6 @@ class ExecEnv {
boost::scoped_ptr<FragmentMgr> _fragment_mgr;
boost::scoped_ptr<TMasterInfo> _master_info;
boost::scoped_ptr<EtlJobMgr> _etl_job_mgr;
boost::scoped_ptr<std::string> _local_ip;
boost::scoped_ptr<LoadPathMgr> _load_path_mgr;
boost::scoped_ptr<DiskIoMgr> _disk_io_mgr;
boost::scoped_ptr<TmpFileMgr> _tmp_file_mgr;
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "agent/cgroups_mgr.h"
#include "common/resource_tls.h"
#include "service/backend_options.h"
#include "runtime/plan_fragment_executor.h"
#include "runtime/exec_env.h"
#include "runtime/datetime_value.h"
Expand Down Expand Up @@ -208,7 +209,7 @@ void FragmentExecState::callback(const Status& status, RuntimeProfile* profile,

std::string FragmentExecState::to_http_path(const std::string& file_name) {
std::stringstream url;
url << "http://" << *_exec_env->local_ip() << ":" << config::webserver_port
url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
<< "/api/_download_load?file=" << file_name;
return url.str();
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@
#include "common/object_pool.h"
#include "common/status.h"
#include "exprs/expr.h"
#include "runtime/buffered_block_mgr.h"
#include "runtime/buffered_block_mgr2.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "runtime/load_path_mgr.h"
// #include "runtime/data_stream_recvr.hpp"
#include "util/cpu_info.h"
#include "util/mem_info.h"
#include "util/debug_util.h"
Expand Down
Loading