Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UniqueFD and ScopeExit #973

Merged
merged 11 commits into from
Oct 12, 2022
79 changes: 79 additions & 0 deletions src/fd_util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#pragma once

#include <unistd.h>
#include <utility>

constexpr const int NullFD = -1;

// just like an unique_ptr, but for (int) fd
struct UniqueFD {
UniqueFD() : fd_(NullFD) {}
explicit UniqueFD(int fd) : fd_(fd) {}

~UniqueFD() { if (fd_ != NullFD) close(fd_); }

UniqueFD(const UniqueFD&) = delete;
UniqueFD(UniqueFD&& f) : fd_(f.fd_) { f.fd_ = NullFD; }

UniqueFD& operator=(const UniqueFD&) = delete;
UniqueFD& operator=(UniqueFD&& f) {
fd_ = f.fd_;
f.fd_ = NullFD;
return *this;
}

int Release() {
int fd = fd_;
fd_ = NullFD;
return fd;
}

void Reset(int fd = NullFD) {
int old_fd = fd_;
fd_ = fd;
if (old_fd != NullFD) close(old_fd);
}

void Close() {
Reset();
}

void Swap(UniqueFD& other) {
std::swap(fd_, other.fd_);
}

int Get() const { return fd_; }
int operator*() const { return fd_; }
explicit operator bool() const { return fd_ != NullFD; }

bool operator==(const UniqueFD& f) const {
return fd_ == f.fd_;
}

bool operator!=(const UniqueFD& f) const {
return !(*this == f);
}

private:
int fd_;
};
16 changes: 7 additions & 9 deletions src/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "config.h"
#include "server.h"
#include "util.h"
#include "fd_util.h"

namespace google {
bool Symbolize(void* pc, char* out, size_t out_size);
Expand Down Expand Up @@ -169,8 +170,8 @@ bool supervisedSystemd() {
return false;
}

int fd = 1;
if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) == -1) {
auto fd = UniqueFD(socket(AF_UNIX, SOCK_DGRAM, 0));
if (!fd) {
LOG(WARNING) << "Can't connect to systemd socket " << notify_socket;
return false;
}
Expand Down Expand Up @@ -201,12 +202,10 @@ bool supervisedSystemd() {
#ifdef HAVE_MSG_NOSIGNAL
sendto_flags |= MSG_NOSIGNAL;
#endif
if (sendmsg(fd, &hdr, sendto_flags) < 0) {
if (sendmsg(*fd, &hdr, sendto_flags) < 0) {
LOG(WARNING) << "Can't send notification to systemd";
close(fd);
return false;
}
close(fd);
return true;
}

Expand All @@ -229,13 +228,12 @@ bool isSupervisedMode(int mode) {
}

static Status createPidFile(const std::string &path) {
int fd = open(path.data(), O_RDWR|O_CREAT, 0660);
if (fd < 0) {
auto fd = UniqueFD(open(path.data(), O_RDWR|O_CREAT, 0660));
if (!fd) {
return Status(Status::NotOK, strerror(errno));
}
std::string pid_str = std::to_string(getpid());
write(fd, pid_str.data(), pid_str.size());
close(fd);
write(*fd, pid_str.data(), pid_str.size());
return Status::OK();
}

Expand Down
18 changes: 9 additions & 9 deletions src/redis_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <thread>
#include <utility>

#include "fd_util.h"
#include "cluster.h"
#include "log_collector.h"
#include "parse_util.h"
Expand Down Expand Up @@ -4603,6 +4604,8 @@ class CommandFetchMeta : public Commander {
// Feed-replica-meta thread
std::thread t = std::thread([svr, repl_fd, ip]() {
Util::ThreadSetName("feed-repl-info");
UniqueFD unique_fd{repl_fd};

std::string files;
auto s = Engine::Storage::ReplDataManager::GetFullReplDataInfo(
svr->storage_, &files);
Expand All @@ -4611,7 +4614,6 @@ class CommandFetchMeta : public Commander {
write(repl_fd, message, strlen(message));
LOG(WARNING) << "[replication] Failed to get full data file info,"
<< " error: " << s.Msg();
close(repl_fd);
return;
}
// Send full data file info
Expand All @@ -4622,7 +4624,6 @@ class CommandFetchMeta : public Commander {
<< ip << ", error: " << strerror(errno);
}
svr->storage_->SetCheckpointAccessTime(std::time(nullptr));
close(repl_fd);
});
t.detach();

Expand All @@ -4649,6 +4650,7 @@ class CommandFetchFile : public Commander {

std::thread t = std::thread([svr, repl_fd, ip, files]() {
Util::ThreadSetName("feed-repl-file");
UniqueFD unique_fd{repl_fd};
svr->IncrFetchFileThread();

for (auto file : files) {
Expand All @@ -4659,22 +4661,21 @@ class CommandFetchFile : public Commander {
svr->GetFetchFileThreadNum();
}
auto start = std::chrono::high_resolution_clock::now();
auto fd = Engine::Storage::ReplDataManager::OpenDataFile(svr->storage_,
file, &file_size);
if (fd < 0) break;
auto fd = UniqueFD(Engine::Storage::ReplDataManager::OpenDataFile(svr->storage_,
file, &file_size));
if (!fd) break;

// Send file size and content
if (Util::SockSend(repl_fd, std::to_string(file_size)+CRLF).IsOK() &&
Util::SockSendFile(repl_fd, fd, file_size).IsOK()) {
Util::SockSendFile(repl_fd, *fd, file_size).IsOK()) {
LOG(INFO) << "[replication] Succeed sending file " << file << " to "
<< ip;
} else {
LOG(WARNING) << "[replication] Fail to send file " << file << " to "
<< ip << ", error: " << strerror(errno);
close(fd);
break;
}
close(fd);
fd.Close();

// Sleep if the speed of sending file is more than replication speed limit
auto end = std::chrono::high_resolution_clock::now();
Expand All @@ -4691,7 +4692,6 @@ class CommandFetchFile : public Commander {
}
svr->storage_->SetCheckpointAccessTime(std::time(nullptr));
svr->DecrFetchFileThread();
close(repl_fd);
});
t.detach();

Expand Down
4 changes: 2 additions & 2 deletions src/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <thread>

#include "event_util.h"
#include "fd_util.h"
#include "redis_reply.h"
#include "rocksdb_crc32c.h"
#include "server.h"
Expand Down Expand Up @@ -748,9 +749,9 @@ Status ReplicationThread::parallelFetchFile(const std::string &dir,
if (!s.IsOK()) {
return Status(Status::NotOK, "connect the server err: " + s.Msg());
}
UniqueFD unique_fd{sock_fd};
s = this->sendAuth(sock_fd);
torwig marked this conversation as resolved.
Show resolved Hide resolved
if (!s.IsOK()) {
close(sock_fd);
return Status(Status::NotOK, "sned the auth command err: " + s.Msg());
}
std::vector<std::string> fetch_files;
Expand Down Expand Up @@ -796,7 +797,6 @@ Status ReplicationThread::parallelFetchFile(const std::string &dir,
s = this->fetchFiles(sock_fd, dir, fetch_files, crcs, fn);
}
}
close(sock_fd);
return s;
}));
}
Expand Down
54 changes: 54 additions & 0 deletions src/scope_exit.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#pragma once

#include <utility>

// ref to https://en.cppreference.com/w/cpp/experimental/scope_exit
template <typename F>
struct ScopeExit {
explicit ScopeExit(F f, bool enabled) : enabled_(enabled), f_(std::move(f)) {}
explicit ScopeExit(F f) : enabled_(true), f_(std::move(f)) {}

ScopeExit(const ScopeExit&) = delete;
ScopeExit(ScopeExit&& se) : enabled_(se.enabled_), f_(std::move(se.f_)) {}

~ScopeExit() {
if (enabled_) f_();
}

void Enable() {
enabled_ = false;
}

void Disable() {
enabled_ = true;
}

bool enabled_;
F f_;
};

// use CTAD in C++17 or above
template <typename F>
ScopeExit<F> MakeScopeExit(F&& f, bool enabled = true) {
return ScopeExit<F>(std::forward<F>(f), enabled);
}
24 changes: 10 additions & 14 deletions src/stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "stats.h"
#include <chrono>
#include "util.h"

Stats::Stats() {
for (int i = 0; i < STATS_METRIC_COUNT; i++) {
Expand Down Expand Up @@ -53,19 +54,20 @@ int64_t Stats::GetMemoryRSS() {
#include <cstdio>
#include <cstring>

#include "fd_util.h"

int64_t Stats::GetMemoryRSS() {
int fd, count;
char buf[4096], filename[256];
snprintf(filename, sizeof(filename), "/proc/%d/stat", getpid());
if ((fd = open(filename, O_RDONLY)) == -1) return 0;
if (read(fd, buf, sizeof(buf)) <= 0) {
close(fd);
auto fd = UniqueFD(open(filename, O_RDONLY));
if (!fd) return 0;
if (read(*fd, buf, sizeof(buf)) <= 0) {
return 0;
}
close(fd);
fd.Close();

char *start = buf;
count = 23; // RSS is the 24th field in /proc/<pid>/stat
int count = 23; // RSS is the 24th field in /proc/<pid>/stat
while (start && count--) {
start = strchr(start, ' ');
if (start) start++;
Expand All @@ -88,20 +90,14 @@ void Stats::IncrLatency(uint64_t latency, const std::string &command_name) {
commands_stats[command_name].latency.fetch_add(latency, std::memory_order_relaxed);
}

uint64_t Stats::GetTimeStamp(void) {
auto tp = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now());
auto ts = std::chrono::duration_cast<std::chrono::milliseconds>(tp.time_since_epoch());
return ts.count();
}

void Stats::TrackInstantaneousMetric(int metric, uint64_t current_reading) {
uint64_t t = GetTimeStamp() - inst_metrics[metric].last_sample_time;
uint64_t t = Util::GetTimeStampMS() - inst_metrics[metric].last_sample_time;
uint64_t ops = current_reading - inst_metrics[metric].last_sample_count;
uint64_t ops_sec = t > 0 ? (ops*1000/t) : 0;
inst_metrics[metric].samples[inst_metrics[metric].idx] = ops_sec;
inst_metrics[metric].idx++;
inst_metrics[metric].idx %= STATS_METRIC_SAMPLES;
inst_metrics[metric].last_sample_time = GetTimeStamp();
inst_metrics[metric].last_sample_time = Util::GetTimeStampMS();
inst_metrics[metric].last_sample_count = current_reading;
}

Expand Down
1 change: 0 additions & 1 deletion src/stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ class Stats {
void IncrPSyncErrCounter() { psync_err_counter.fetch_add(1, std::memory_order_relaxed); }
void IncrPSyncOKCounter() { psync_ok_counter.fetch_add(1, std::memory_order_relaxed); }
static int64_t GetMemoryRSS();
uint64_t GetTimeStamp(void);
void TrackInstantaneousMetric(int metric, uint64_t current_reading);
uint64_t GetInstantaneousMetric(int metric);
};
2 changes: 2 additions & 0 deletions src/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class Status {

static Status OK() { return {}; }

static Status FromErrno() { return Status(NotOK, strerror(errno)); }

private:
Code code_;
std::string msg_;
Expand Down
3 changes: 2 additions & 1 deletion src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <memory>
#include <random>

#include "fd_util.h"
#include "compact_filter.h"
#include "config.h"
#include "event_listener.h"
Expand Down Expand Up @@ -853,7 +854,7 @@ int Storage::ReplDataManager::OpenDataFile(Storage *storage,
auto s = storage->env_->FileExists(abs_path);
if (!s.ok()) {
LOG(ERROR) << "[storage] Data file [" << abs_path << "] not found";
return -1;
return NullFD;
}
storage->env_->GetFileSize(abs_path, file_size);
auto rv = open(abs_path.c_str(), O_RDONLY);
Expand Down
Loading