Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raft cache persit improvement #4

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
611e1a1
Fix bug: RegionConcatedScanRemover may not traverse in order of _tidb…
solotzg Jan 29, 2019
2bbb958
kvproto's cpp files should be generated seperately
flowbehappy Jan 29, 2019
e9d5077
Merge branch 'raft' of github.com:pingcap/tics into raft
flowbehappy Jan 29, 2019
1b2384e
Use ReplacingTMTBlockInputStream and MvccTMTSortedBlockInputStream
innerr Jan 30, 2019
99214f2
Update contrib/kvproto
flowbehappy Jan 30, 2019
2ddc225
Use a seperated del-mark to handle region moving
innerr Feb 7, 2019
0068220
Fix some coding format
innerr Feb 7, 2019
eecf816
Make sql query not support Insert/Delete in txn engine.
solotzg Feb 11, 2019
c3eeeda
Check storage whether exists in removeRegion & splitRegion.
solotzg Feb 11, 2019
5705a85
Reverse definite-delmark to fix merging bug
innerr Feb 11, 2019
fc344a7
Update docker files
zanmato1984 Feb 12, 2019
08d0dc1
Fix build command in docker
zanmato1984 Feb 12, 2019
c35ee50
Use definite-del-mark to solve data tilt
innerr Feb 12, 2019
8cb44ba
Use self-built curl instead
zanmato1984 Feb 13, 2019
5e97627
Change builder image name
zanmato1984 Feb 15, 2019
42c6a91
Fix border check in RegionPartition.
solotzg Feb 15, 2019
b446f51
Merge pull request #1 from pingcap/raft-docker
zanmato1984 Feb 18, 2019
263a1a6
Add Exception ID in InterpreterInsertQuery and InterpreterDeleteQuery.
solotzg Feb 21, 2019
fb67fcd
Refactor region flush process.
flowbehappy Feb 13, 2019
d6c407b
Merge pull request #2 from pingcap/region-flush-refactor
flowbehappy Feb 22, 2019
c53aaea
Unify PersistedContainer Map, Vector and Set
flowbehappy Feb 21, 2019
960eb85
Add version to Region and RegionFile, and do hashcode check for regio…
flowbehappy Feb 26, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmake/find_kvproto.cmake
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# Currently kvproto should always use bundled library.

if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/kvproto/cpp/kvproto/errorpb.pb.h")
message (FATAL_ERROR "kvproto submodule in contrib/kvproto is missing.")
if (EXISTS "${ClickHouse_SOURCE_DIR}/contrib/kvproto/proto/errorpb.proto")
message (FATAL_ERROR "kvproto cpp files in contrib/kvproto is missing. Try go to contrib/kvproto, and run ./generate_cpp.sh")
else()
message (FATAL_ERROR "kvproto submodule in contrib/kvproto is missing. Try run 'git submodule update --init --recursive', and go to contrib/kvproto, and run ./generate_cpp.sh")
endif()
endif ()

message(STATUS "Using kvproto: ${ClickHouse_SOURCE_DIR}/contrib/kvproto/cpp")
Expand Down
2 changes: 1 addition & 1 deletion contrib/kvproto
Submodule kvproto updated 52 files
+4 −0 .gitignore
+0 −19 cpp/kvproto/coprocessor.grpc.pb.cc
+0 −32 cpp/kvproto/coprocessor.grpc.pb.h
+0 −1,400 cpp/kvproto/coprocessor.pb.cc
+0 −1,061 cpp/kvproto/coprocessor.pb.h
+0 −376 cpp/kvproto/debugpb.grpc.pb.cc
+0 −1,374 cpp/kvproto/debugpb.grpc.pb.h
+0 −9,247 cpp/kvproto/debugpb.pb.cc
+0 −5,557 cpp/kvproto/debugpb.pb.h
+0 −89 cpp/kvproto/enginepb.grpc.pb.cc
+0 −228 cpp/kvproto/enginepb.grpc.pb.h
+0 −3,682 cpp/kvproto/enginepb.pb.cc
+0 −2,172 cpp/kvproto/enginepb.pb.h
+0 −3,180 cpp/kvproto/eraftpb.pb.cc
+0 −1,991 cpp/kvproto/eraftpb.pb.h
+0 −19 cpp/kvproto/errorpb.grpc.pb.cc
+0 −32 cpp/kvproto/errorpb.grpc.pb.h
+0 −3,284 cpp/kvproto/errorpb.pb.cc
+0 −2,086 cpp/kvproto/errorpb.pb.h
+0 −220 cpp/kvproto/import_kvpb.grpc.pb.cc
+0 −759 cpp/kvproto/import_kvpb.grpc.pb.h
+0 −5,629 cpp/kvproto/import_kvpb.pb.cc
+0 −3,351 cpp/kvproto/import_kvpb.pb.h
+0 −142 cpp/kvproto/import_sstpb.grpc.pb.cc
+0 −459 cpp/kvproto/import_sstpb.grpc.pb.h
+0 −3,307 cpp/kvproto/import_sstpb.pb.cc
+0 −2,053 cpp/kvproto/import_sstpb.pb.h
+0 −19 cpp/kvproto/kvrpcpb.grpc.pb.cc
+0 −32 cpp/kvproto/kvrpcpb.grpc.pb.h
+0 −25,954 cpp/kvproto/kvrpcpb.pb.cc
+0 −17,617 cpp/kvproto/kvrpcpb.pb.h
+0 −19 cpp/kvproto/metapb.grpc.pb.cc
+0 −32 cpp/kvproto/metapb.grpc.pb.h
+0 −2,376 cpp/kvproto/metapb.pb.cc
+0 −1,490 cpp/kvproto/metapb.pb.h
+0 −633 cpp/kvproto/pdpb.grpc.pb.cc
+0 −2,255 cpp/kvproto/pdpb.grpc.pb.h
+0 −20,078 cpp/kvproto/pdpb.pb.cc
+0 −12,258 cpp/kvproto/pdpb.pb.h
+0 −19 cpp/kvproto/raft_cmdpb.grpc.pb.cc
+0 −32 cpp/kvproto/raft_cmdpb.grpc.pb.h
+0 −16,527 cpp/kvproto/raft_cmdpb.pb.cc
+0 −10,555 cpp/kvproto/raft_cmdpb.pb.h
+0 −19 cpp/kvproto/raft_serverpb.grpc.pb.cc
+0 −32 cpp/kvproto/raft_serverpb.grpc.pb.h
+0 −4,861 cpp/kvproto/raft_serverpb.pb.cc
+0 −2,992 cpp/kvproto/raft_serverpb.pb.h
+0 −818 cpp/kvproto/tikvpb.grpc.pb.cc
+0 −2,965 cpp/kvproto/tikvpb.grpc.pb.h
+0 −133 cpp/kvproto/tikvpb.pb.cc
+0 −75 cpp/kvproto/tikvpb.pb.h
+1 −0 generate_cpp.sh
1 change: 1 addition & 0 deletions dbms/src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ namespace ErrorCodes
extern const int THEFLASH_ENCODER_ERROR = 9002;
extern const int THEFLASH_SESSION_ERROR = 9003;
extern const int DECIMAL_OVERFLOW_ERROR = 9004;
extern const int SIZE_CHECK_FAILED = 9004;
extern const int LOCK_EXCEPTION = 10000;
}

Expand Down
173 changes: 96 additions & 77 deletions dbms/src/Common/PersistedContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,71 @@
#include <Poco/Path.h>

#include <Core/Types.h>
#include <IO/HashingReadBuffer.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>

// TODO find a way to unify PersistedContainerSetOrVector and PersistedContainerMap, and Write & Read should be able to use lambda.
namespace DB
{

namespace ErrorCodes
{
extern const int CHECKSUM_DOESNT_MATCH;
}

constexpr UInt8 PERSISTED_CONTAINER_MAGIC_WORD = 0xFF;
constexpr size_t HASH_CODE_LENGTH = sizeof(DB::HashingWriteBuffer::uint128);

template <bool is_set, typename T, template <typename E = T, typename...> class Container, class Write, class Read>
struct PersistedContainerSetOrVector
template <bool is_map, bool is_set, class Trait>
struct PersistedContainer : public Trait
{
public:
PersistedContainerSetOrVector(const std::string & path_) : path(path_) {}
using Container = typename Trait::Container;
using Write = typename Trait::Write;
using Read = typename Trait::Read;

Container<T> & get() { return container; }
explicit PersistedContainer(const std::string & path_) : path(path_) {}

auto & get() { return container; }

void persist()
{
std::string tmp_file_path = path + ".tmp." + DB::toString(Poco::Timestamp().epochMicroseconds());
DB::WriteBufferFromFile file_buf(tmp_file_path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT);
size_t size = container.size();
writeIntBinary(size, file_buf);
for (const T & t : container)
{
write(t, file_buf);
}
file_buf.next();
file_buf.sync();
DB::WriteBufferFromFile file_buf(tmp_file_path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT);
file_buf.seek(HASH_CODE_LENGTH);

DB::HashingWriteBuffer hash_buf(file_buf);
size_t size = container.size();
writeIntBinary(size, hash_buf);
if constexpr (is_map)
{
for (auto && [k, v] : container)
{
writeIntBinary(PERSISTED_CONTAINER_MAGIC_WORD, hash_buf);
write(k, v, hash_buf);
}
}
else
{
for (const auto & t : container)
{
writeIntBinary(PERSISTED_CONTAINER_MAGIC_WORD, hash_buf);
write(t, hash_buf);
}
}
hash_buf.next();

auto hashcode = hash_buf.getHash();
file_buf.seek(0);
writeIntBinary(hashcode.first, file_buf);
writeIntBinary(hashcode.second, file_buf);

file_buf.sync();
}
Poco::File(tmp_file_path).renameTo(path);
}

Expand All @@ -46,100 +83,80 @@ struct PersistedContainerSetOrVector
if (!Poco::File(path).exists())
return;
DB::ReadBufferFromFile file_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_RDONLY);

DB::HashingReadBuffer::uint128 expected_hashcode;
readIntBinary(expected_hashcode.first, file_buf);
readIntBinary(expected_hashcode.second, file_buf);

DB::HashingReadBuffer hash_buf(file_buf);
size_t size;
readIntBinary(size, file_buf);
readIntBinary(size, hash_buf);
UInt8 word;
for (size_t i = 0; i < size; ++i)
{
if constexpr (is_set)
readIntBinary(word, hash_buf);
if (word != PERSISTED_CONTAINER_MAGIC_WORD)
throw DB::Exception("Magic word does not match!", DB::ErrorCodes::CHECKSUM_DOESNT_MATCH);

if constexpr (is_map)
{
const auto && [k, v] = read(hash_buf);
container.emplace(k, v);
}
else if constexpr (is_set)
{
container.insert(std::move(read(file_buf)));
container.insert(std::move(read(hash_buf)));
}
else
{
container.push_back(std::move(read(file_buf)));
container.push_back(std::move(read(hash_buf)));
}
}
auto hashcode = hash_buf.getHash();
if (hashcode != expected_hashcode)
throw DB::Exception("Hashcode does not match!", DB::ErrorCodes::CHECKSUM_DOESNT_MATCH);
}

void drop()
{
Poco::File f(path);
if (f.exists())
f.remove(false);
Container<T> tmp;
Container tmp;
container.swap(tmp);
}

private:
std::string path;
Container<T> container;
Container container;
Write write{};
Read read{};
};

template <typename Key,
typename Value,
template <typename CKey = Key, typename CValue = Value, typename...> class Container,
class Write,
class Read>
struct PersistedContainerMap
template <typename K, typename V, template <typename...> class C, typename W, typename R>
struct MapTrait
{
public:
PersistedContainerMap(const std::string & path_) : path(path_) {}

Container<Key, Value> & get() { return container; }

void persist()
{
std::string tmp_file_path = path + ".tmp." + DB::toString(Poco::Timestamp().epochMicroseconds());
DB::WriteBufferFromFile file_buf(tmp_file_path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_TRUNC | O_CREAT);
size_t size = container.size();
writeIntBinary(size, file_buf);
for (auto && [k, v] : container)
{
write(k, v, file_buf);
}
file_buf.next();
file_buf.sync();

Poco::File(tmp_file_path).renameTo(path);
}

void restore()
{
if (!Poco::File(path).exists())
return;
DB::ReadBufferFromFile file_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_RDONLY);
size_t size;
readIntBinary(size, file_buf);
for (size_t i = 0; i < size; ++i)
{
const auto && [k, v] = read(file_buf);
container.emplace(k, v);
}
}

void drop()
{
Poco::File f(path);
if (f.exists())
f.remove(false);
Container<Key, Value> tmp;
container.swap(tmp);
}
using Container = C<K, V>;
using Write = W;
using Read = R;
};

private:
std::string path;
Container<Key, Value> container;
Write write{};
Read read{};
template <typename T, template <typename...> class C, typename W, typename R>
struct VecSetTrait
{
using Container = C<T>;
using Write = W;
using Read = R;
};

template <typename T, template <typename E = T, typename...> class Container, class Write, class Read>
using PersistedContainerSet = PersistedContainerSetOrVector<true, T, Container, Write, Read>;
template <typename K, typename V, template <typename...> class C, class Write, class Read>
using PersistedContainerMap = PersistedContainer<true, false, MapTrait<K, V, C, Write, Read>>;

template <typename T, template <typename...> class C, class Write, class Read>
using PersistedContainerSet = PersistedContainer<false, true, VecSetTrait<T, C, Write, Read>>;

template <typename T, template <typename E = T, typename...> class Container, class Write, class Read>
using PersistedContainerVector = PersistedContainerSetOrVector<false, T, Container, Write, Read>;
template <typename T, template <typename...> class C, class Write, class Read>
using PersistedContainerVector = PersistedContainer<false, false, VecSetTrait<T, C, Write, Read>>;

struct UInt64Write
{
Expand Down Expand Up @@ -177,3 +194,5 @@ struct UInt64StringRead
};
using PersistedUnorderedUInt64ToStringMap
= PersistedContainerMap<UInt64, std::string, std::unordered_map, UInt64StringWrite, UInt64StringRead>;

} // namespace DB
24 changes: 14 additions & 10 deletions dbms/src/Common/tests/persisted_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,21 @@

#include <Common/PersistedContainer.h>

using namespace DB;

int main(int, char **)
{
auto clear_file = [=](std::string path) {
Poco::File file(path);
if (file.exists())
file.remove();
};

{
std::string file_path = "persisted_container_set_test.dat";
SCOPE_EXIT({
Poco::File file(file_path);
if (file.exists())
file.remove();
});
clear_file(file_path);
SCOPE_EXIT({ clear_file(file_path); });

{
PersistedUnorderedUInt64Set set(file_path);
set.restore();
Expand All @@ -38,11 +44,9 @@ int main(int, char **)

{
std::string file_path = "persisted_container_map_test.dat";
SCOPE_EXIT({
Poco::File file(file_path);
if (file.exists())
file.remove();
});
clear_file(file_path);
SCOPE_EXIT({ clear_file(file_path); });

{
PersistedUnorderedUInt64ToStringMap map(file_path);
map.restore();
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/DeletingDeletedBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Block DeletingDeletedBlockInputStream::readImpl()
if (!block)
return block;
IColumn::Filter filter(block.rows());
size_t count = setFilterByDeleteMarkColumn(block, filter);
size_t count = setFilterByDelMarkColumn(block, filter);
if (count > 0)
deleteRows(block, filter);
return block;
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/DataStreams/MvccTMTSortedBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <DataStreams/MvccTMTSortedBlockInputStream.h>
#include <Storages/MutableSupport.h>

namespace DB
{
Expand Down Expand Up @@ -42,7 +43,7 @@ void MvccTMTSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
setPrimaryKeyRef(next_key, current);

bool key_differs = next_key != current_key;

if (key_differs && merged_rows >= max_block_size)
return;

Expand All @@ -56,7 +57,7 @@ void MvccTMTSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
selected_row.reset();

current_key.swap(next_key);
}
}

if ((*(current->all_columns[version_column_number]))[current->pos].template get<UInt64>() <= read_tso &&
(selected_row.empty()
Expand Down Expand Up @@ -87,7 +88,8 @@ void MvccTMTSortedBlockInputStream::merge(MutableColumns & merged_columns, std::

bool MvccTMTSortedBlockInputStream::hasDeleteFlag()
{
return (*(*selected_row.columns)[del_column_number])[selected_row.row_num].template get<UInt8>() > 0;
UInt8 val = (*(*selected_row.columns)[del_column_number])[selected_row.row_num].template get<UInt8>();
return MutableSupport::DelMark::isDel(val);
}

}
13 changes: 6 additions & 7 deletions dbms/src/DataStreams/MvccTMTSortedBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ namespace DB

class MvccTMTSortedBlockInputStream : public MergingSortedBlockInputStream
{
public :
public:
MvccTMTSortedBlockInputStream(
const BlockInputStreams & inputs_,
const BlockInputStreams & inputs_,
const SortDescription & description_,
const String & version_column,
const String & del_column,
Expand All @@ -23,31 +23,30 @@ public :
version_column_number = header.getPositionByName(version_column);
del_column_number = header.getPositionByName(del_column);
}

String getName() const override { return "MvccTMTSorted"; }

protected:
Block readImpl() override;

private:
ssize_t version_column_number ;
ssize_t del_column_number ;
ssize_t version_column_number;
ssize_t del_column_number;
Logger * log = &Logger::get("MvccTMTStortedBlockInputStream");

bool finished = false;

RowRef current_key;
RowRef next_key;
RowRef selected_row;

size_t read_tso;

void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);

bool hasDeleteFlag();

void insertRow(MutableColumns &, size_t &);

};

}
Loading