Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raft cache persist improve #5

Merged
merged 6 commits into from
Mar 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ namespace ErrorCodes
extern const int THEFLASH_ENCODER_ERROR = 9002;
extern const int THEFLASH_SESSION_ERROR = 9003;
extern const int DECIMAL_OVERFLOW_ERROR = 9004;
extern const int FILE_SIZE_NOT_MATCH = 9005;
extern const int LOCK_EXCEPTION = 10000;
}

Expand Down
173 changes: 96 additions & 77 deletions dbms/src/Common/PersistedContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,71 @@
#include <Poco/Path.h>

#include <Core/Types.h>
#include <IO/HashingReadBuffer.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>

// TODO find a way to unify PersistedContainerSetOrVector and PersistedContainerMap, and Write & Read should be able to use lambda.
namespace DB
{

namespace ErrorCodes
{
extern const int CHECKSUM_DOESNT_MATCH;
}

constexpr UInt8 PERSISTED_CONTAINER_MAGIC_WORD = 0xFF;
constexpr size_t HASH_CODE_LENGTH = sizeof(DB::HashingWriteBuffer::uint128);

template <bool is_set, typename T, template <typename E = T, typename...> class Container, class Write, class Read>
struct PersistedContainerSetOrVector
template <bool is_map, bool is_set, class Trait>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can make abstraction in the concept of Iterator later if have time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is map use K and V, while others use T only. I can unify map by std::pair<K, V>, but not user friendly.

struct PersistedContainer : public Trait
{
public:
PersistedContainerSetOrVector(const std::string & path_) : path(path_) {}
using Container = typename Trait::Container;
using Write = typename Trait::Write;
using Read = typename Trait::Read;

Container<T> & get() { return container; }
explicit PersistedContainer(const std::string & path_) : path(path_) {}

auto & get() { return container; }

void persist()
{
std::string tmp_file_path = path + ".tmp." + DB::toString(Poco::Timestamp().epochMicroseconds());
DB::WriteBufferFromFile file_buf(tmp_file_path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT);
size_t size = container.size();
writeIntBinary(size, file_buf);
for (const T & t : container)
{
write(t, file_buf);
}
file_buf.next();
file_buf.sync();
DB::WriteBufferFromFile file_buf(tmp_file_path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT);
file_buf.seek(HASH_CODE_LENGTH);

DB::HashingWriteBuffer hash_buf(file_buf);
size_t size = container.size();
writeIntBinary(size, hash_buf);
if constexpr (is_map)
{
for (auto && [k, v] : container)
{
writeIntBinary(PERSISTED_CONTAINER_MAGIC_WORD, hash_buf);
write(k, v, hash_buf);
}
}
else
{
for (const auto & t : container)
{
writeIntBinary(PERSISTED_CONTAINER_MAGIC_WORD, hash_buf);
write(t, hash_buf);
}
}
hash_buf.next();

auto hashcode = hash_buf.getHash();
file_buf.seek(0);
writeIntBinary(hashcode.first, file_buf);
writeIntBinary(hashcode.second, file_buf);

file_buf.sync();
}
Poco::File(tmp_file_path).renameTo(path);
}

Expand All @@ -46,100 +83,80 @@ struct PersistedContainerSetOrVector
if (!Poco::File(path).exists())
return;
DB::ReadBufferFromFile file_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_RDONLY);

DB::HashingReadBuffer::uint128 expected_hashcode;
readIntBinary(expected_hashcode.first, file_buf);
readIntBinary(expected_hashcode.second, file_buf);

DB::HashingReadBuffer hash_buf(file_buf);
size_t size;
readIntBinary(size, file_buf);
readIntBinary(size, hash_buf);
UInt8 word;
for (size_t i = 0; i < size; ++i)
{
if constexpr (is_set)
readIntBinary(word, hash_buf);
if (word != PERSISTED_CONTAINER_MAGIC_WORD)
throw DB::Exception("Magic word does not match!", DB::ErrorCodes::CHECKSUM_DOESNT_MATCH);

if constexpr (is_map)
{
const auto && [k, v] = read(hash_buf);
container.emplace(k, v);
}
else if constexpr (is_set)
{
container.insert(std::move(read(file_buf)));
container.insert(std::move(read(hash_buf)));
}
else
{
container.push_back(std::move(read(file_buf)));
container.push_back(std::move(read(hash_buf)));
}
}
auto hashcode = hash_buf.getHash();
if (hashcode != expected_hashcode)
throw DB::Exception("Hashcode does not match!", DB::ErrorCodes::CHECKSUM_DOESNT_MATCH);
}

void drop()
{
Poco::File f(path);
if (f.exists())
f.remove(false);
Container<T> tmp;
Container tmp;
container.swap(tmp);
}

private:
std::string path;
Container<T> container;
Container container;
Write write{};
Read read{};
};

template <typename Key,
typename Value,
template <typename CKey = Key, typename CValue = Value, typename...> class Container,
class Write,
class Read>
struct PersistedContainerMap
template <typename K, typename V, template <typename...> class C, typename W, typename R>
struct MapTrait
{
public:
PersistedContainerMap(const std::string & path_) : path(path_) {}

Container<Key, Value> & get() { return container; }

void persist()
{
std::string tmp_file_path = path + ".tmp." + DB::toString(Poco::Timestamp().epochMicroseconds());
DB::WriteBufferFromFile file_buf(tmp_file_path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT);
size_t size = container.size();
writeIntBinary(size, file_buf);
for (auto && [k, v] : container)
{
write(k, v, file_buf);
}
file_buf.next();
file_buf.sync();

Poco::File(tmp_file_path).renameTo(path);
}

void restore()
{
if (!Poco::File(path).exists())
return;
DB::ReadBufferFromFile file_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_RDONLY);
size_t size;
readIntBinary(size, file_buf);
for (size_t i = 0; i < size; ++i)
{
const auto && [k, v] = read(file_buf);
container.emplace(k, v);
}
}

void drop()
{
Poco::File f(path);
if (f.exists())
f.remove(false);
Container<Key, Value> tmp;
container.swap(tmp);
}
using Container = C<K, V>;
using Write = W;
using Read = R;
};

private:
std::string path;
Container<Key, Value> container;
Write write{};
Read read{};
template <typename T, template <typename...> class C, typename W, typename R>
struct VecSetTrait
{
using Container = C<T>;
using Write = W;
using Read = R;
};

template <typename T, template <typename E = T, typename...> class Container, class Write, class Read>
using PersistedContainerSet = PersistedContainerSetOrVector<true, T, Container, Write, Read>;
template <typename K, typename V, template <typename...> class C, class Write, class Read>
using PersistedContainerMap = PersistedContainer<true, false, MapTrait<K, V, C, Write, Read>>;

template <typename T, template <typename...> class C, class Write, class Read>
using PersistedContainerSet = PersistedContainer<false, true, VecSetTrait<T, C, Write, Read>>;

template <typename T, template <typename E = T, typename...> class Container, class Write, class Read>
using PersistedContainerVector = PersistedContainerSetOrVector<false, T, Container, Write, Read>;
template <typename T, template <typename...> class C, class Write, class Read>
using PersistedContainerVector = PersistedContainer<false, false, VecSetTrait<T, C, Write, Read>>;

struct UInt64Write
{
Expand Down Expand Up @@ -177,3 +194,5 @@ struct UInt64StringRead
};
using PersistedUnorderedUInt64ToStringMap
= PersistedContainerMap<UInt64, std::string, std::unordered_map, UInt64StringWrite, UInt64StringRead>;

} // namespace DB
24 changes: 14 additions & 10 deletions dbms/src/Common/tests/persisted_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,21 @@

#include <Common/PersistedContainer.h>

using namespace DB;

int main(int, char **)
{
auto clear_file = [=](std::string path) {
Poco::File file(path);
if (file.exists())
file.remove();
};

{
std::string file_path = "persisted_container_set_test.dat";
SCOPE_EXIT({
Poco::File file(file_path);
if (file.exists())
file.remove();
});
clear_file(file_path);
SCOPE_EXIT({ clear_file(file_path); });

{
PersistedUnorderedUInt64Set set(file_path);
set.restore();
Expand All @@ -38,11 +44,9 @@ int main(int, char **)

{
std::string file_path = "persisted_container_map_test.dat";
SCOPE_EXIT({
Poco::File file(file_path);
if (file.exists())
file.remove();
});
clear_file(file_path);
SCOPE_EXIT({ clear_file(file_path); });

{
PersistedUnorderedUInt64ToStringMap map(file_path);
map.restore();
Expand Down
96 changes: 96 additions & 0 deletions dbms/src/Storages/Transaction/HashCheckHelper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#include <fcntl.h>

#include <Storages/Transaction/HashCheckHelper.h>

namespace DB
{

namespace ErrorCodes
{
extern const int FILE_SIZE_NOT_MATCH;
extern const int FILE_DOESNT_EXIST;
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
extern const int UNEXPECTED_END_OF_FILE;
extern const int CHECKSUM_DOESNT_MATCH;
extern const int CANNOT_SEEK_THROUGH_FILE;
} // namespace ErrorCodes


namespace FileHashCheck
{
void readFileFully(const std::string & path, int fd, off_t file_offset, size_t read_size, char * data)
{

if (-1 == ::lseek(fd, file_offset, SEEK_SET))
throwFromErrno("Cannot seek through file " + path, ErrorCodes::CANNOT_SEEK_THROUGH_FILE);

char * pos = data;
size_t remain = read_size;
while (remain)
{
auto res = ::read(fd, pos, remain);
if (-1 == res && errno != EINTR)
throwFromErrno("Cannot read from file " + path, ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
if (!res && errno != EINTR)
throwFromErrno("End of file", ErrorCodes::UNEXPECTED_END_OF_FILE);

remain -= res;
pos += res;
}
}

void checkObjectHashInFile(const std::string & path, const std::vector<size_t> & object_bytes, const std::vector<bool> & use,
const std::vector<uint128> & expected_hash_codes, size_t block_size)
{
Poco::File file(path);
size_t file_size = file.getSize();
size_t total_size = 0;
size_t max_size = 0;
for (auto b : object_bytes)
{
total_size += b;
max_size = std::max(max_size, b);
}
if (total_size != file_size)
throw Exception("File size not match! Expected: " + DB::toString(total_size) + ", got: " + DB::toString(file_size),
ErrorCodes::FILE_SIZE_NOT_MATCH);

char * object_data_buf = (char *)malloc(max_size);
SCOPE_EXIT({ free(object_data_buf); });

auto fd = open(path.c_str(), O_RDONLY);
if (-1 == fd)
throwFromErrno("Cannot open file " + path, errno == ENOENT ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE);
SCOPE_EXIT({ ::close(fd); });

off_t file_offset = 0;
for (size_t index = 0; index < object_bytes.size(); ++index)
{
if (use[index])
{
uint128 hashcode{0, 0};
size_t bytes = object_bytes[index];
char * pos = object_data_buf;

readFileFully(path, fd, file_offset, bytes, object_data_buf);

while (bytes)
{
auto to_cal_bytes = std::min(bytes, block_size);
hashcode = CityHash_v1_0_2::CityHash128WithSeed(pos, to_cal_bytes, hashcode);
pos += to_cal_bytes;
bytes -= to_cal_bytes;
}

if (hashcode != expected_hash_codes[index])
throw Exception(
"File " + path + " hash code not match at object index: " + DB::toString(index), ErrorCodes::CHECKSUM_DOESNT_MATCH);
}

file_offset += object_bytes[index];
}
}
} // namespace FileHashCheck

} // namespace DB
Loading