Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some performance related issues #940

Merged
merged 12 commits into from
Mar 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 23 additions & 40 deletions src/consensus/pbft/libbyz/Big_req_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,6 @@
#include "Request.h"
#include "ds/logger.h"

struct Waiting_pp
{
Seqno n;
View v;
int i;
};

class BR_entry
{
public:
inline BR_entry() : r(0), maxn(-1), maxv(-1) {}
inline ~BR_entry()
{
delete r;
}

Digest rd; // Request's digest
Request* r; // Request or 0 is request not received
// if r=0, Seqnos of pre-prepares waiting for request
std::vector<Waiting_pp> waiting;
Seqno maxn; // Maximum seqno of pre-prepare referencing request
View maxv; // Maximum view in which this entry was marked useful
};

Big_req_table::Big_req_table(size_t num_of_replicas) :
breqs(max_out),
last_stable(0),
Expand Down Expand Up @@ -67,7 +43,7 @@ inline void Big_req_table::remove_unmatched(BR_entry* bre)
PBFT_ASSERT(bre->r != 0, "Invalid state");
auto& centry = unmatched[bre->r->client_id()];

centry.requests.remove(bre->r);
centry.list.remove(bre);
centry.num_requests--;
PBFT_ASSERT(centry.num_requests >= 0, "Should be positive");
}
Expand Down Expand Up @@ -204,35 +180,37 @@ bool Big_req_table::check_pcerts(BR_entry* bre)
return false;
}

bool Big_req_table::add_unmatched(Request* r, Request*& old_req)
bool Big_req_table::add_unmatched(BR_entry* e, Request*& old_req)
{
auto& centry = unmatched[r->client_id()];
auto& centry = unmatched[e->r->client_id()];
old_req = 0;

if (
!centry.requests.empty() &&
centry.last_value_seen[r->user_id()] >= r->request_id())
!centry.list.is_empty() &&
centry.last_value_seen[e->r->user_id()] >= e->r->request_id())
{
// client is expected to send requests in request id order
LOG_FAIL_FMT(
"client is expected to send requests in request id order {}",
r->client_id());
"client is expected to send requests in request id order {}, last seen "
"{}",
e->r->client_id(),
centry.last_value_seen[e->r->user_id()]);
return false;
}

if (centry.num_requests >= Max_unmatched_requests_per_client)
{
LOG_FAIL_FMT("Too many Requests pending from client: {}", r->client_id());
old_req = centry.requests.back();
centry.requests.pop_back();
LOG_FAIL_FMT(
"Too many Requests pending from client: {}", e->r->client_id());
old_req = centry.list.pop_tail()->r;
}
else
{
centry.num_requests++;
}

centry.last_value_seen[r->user_id()] = r->request_id();
centry.requests.push_front(r);
centry.last_value_seen[e->r->user_id()] = e->r->request_id();
centry.list.insert(e);
return true;
}

Expand Down Expand Up @@ -294,12 +272,12 @@ bool Big_req_table::add_request(Request* r, bool verified)
// Buffer up to Max_unmatched_requests_per_client requests with the
// largest timestamps from client.
Request* old_req = 0;
bool added = add_unmatched(r, old_req);
auto bre = new BR_entry();
bre->rd = rd;
bre->r = r;
bool added = add_unmatched(bre, old_req);
if (added)
{
auto bre = new BR_entry;
bre->rd = rd;
bre->r = r;
breqs.insert({rd, bre});

if (old_req)
Expand All @@ -313,6 +291,11 @@ bool Big_req_table::add_request(Request* r, bool verified)

return true;
}
else
{
bre->r = nullptr;
delete bre;
}
}
return false;
}
Expand Down
33 changes: 30 additions & 3 deletions src/consensus/pbft/libbyz/Big_req_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,43 @@

#include "Digest.h"
#include "Req_queue.h"
#include "ds/dllist.h"
#include "ds/thread_messaging.h"
#include "types.h"

#include <list>
#include <unordered_map>
#include <vector>

class BR_entry;
class Request;

struct Waiting_pp
{
Seqno n;
View v;
int i;
};

class BR_entry
{
public:
inline BR_entry() : r(0), maxn(-1), maxv(-1), next(nullptr), prev(nullptr) {}
inline ~BR_entry()
{
delete r;
}

Digest rd; // Request's digest
Request* r; // Request or 0 is request not received
// if r=0, Seqnos of pre-prepares waiting for request
std::vector<Waiting_pp> waiting;
Seqno maxn; // Maximum seqno of pre-prepare referencing request
View maxv; // Maximum view in which this entry was marked useful

BR_entry* next;
BR_entry* prev;
};

class Big_req_table
{
//
Expand Down Expand Up @@ -98,7 +125,7 @@ class Big_req_table
// Effects: Removes bre->r from unmatched if it was not previously matched
// to a pre-prepare.

bool add_unmatched(Request* r, Request*& old_req);
bool add_unmatched(BR_entry* e, Request*& old_req);
// Effects: Adds r to the list of requests for the client if the request
// id is greater than the largest in the list and returns true. If this causes
// the number of requests to exceed Max_unmatched_requests_per_client,
Expand All @@ -114,7 +141,7 @@ class Big_req_table
struct Unmatched_requests
{
Unmatched_requests() : num_requests(0) {}
std::list<Request*> requests;
snmalloc::DLList<BR_entry> list;
int num_requests;
std::array<uint64_t, enclave::ThreadMessaging::max_num_threads>
last_value_seen = {0};
Expand Down
14 changes: 2 additions & 12 deletions src/consensus/pbft/libbyz/Checkpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ Checkpoint::Checkpoint(Seqno s, Digest& d, bool stable) :
auth_len = sizeof(Checkpoint_rep);
auth_src_offset = 0;
#else
rep().sig_size = pbft::GlobalState::get_node().gen_signature(
contents(), sizeof(Checkpoint_rep), contents() + sizeof(Checkpoint_rep));
rep().sig_size = 0;
#endif
}

Expand All @@ -52,8 +51,7 @@ void Checkpoint::re_authenticate(Principal* p, bool stable)
if (rep().extra != 1 && stable)
{
rep().extra = 1;
rep().sig_size = pbft::GlobalState::get_node().gen_signature(
contents(), sizeof(Checkpoint_rep), contents() + sizeof(Checkpoint_rep));
rep().sig_size = 0;
}
#endif
}
Expand All @@ -66,14 +64,6 @@ bool Checkpoint::pre_verify()
return false;
}

// Check signature size.
if (
size() - (int)sizeof(Checkpoint_rep) <
pbft::GlobalState::get_node().auth_size(id()))
{
return false;
}

return true;
}

Expand Down
6 changes: 6 additions & 0 deletions src/consensus/pbft/libbyz/Replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ Message* Replica::create_message(const uint8_t* data, uint32_t size)

default:
// Unknown message type.
LOG_FAIL_FMT("Unknown message type:{}", Message::get_tag(data));
delete m;
return nullptr;
}
Expand All @@ -329,6 +330,11 @@ void Replica::receive_message(const uint8_t* data, uint32_t size)
LOG_FAIL << "Received message size exceeds message: " << size << std::endl;
}
Message* m = create_message(data, size);
if (m == nullptr)
{
return;
}

uint32_t target_thread = 0;

if (enclave::ThreadMessaging::thread_count > 1 && m->tag() == Request_tag)
Expand Down
79 changes: 34 additions & 45 deletions src/consensus/pbft/libbyz/Req_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,13 @@
#include "Pre_prepare.h"
#include "Request.h"

Req_queue::Req_queue() :
reqs(Max_num_replicas),
head(nullptr),
tail(nullptr),
nelems(0),
nbytes(0)
{}
Req_queue::Req_queue() : reqs(Max_num_replicas), nelems(0), nbytes(0) {}

bool Req_queue::append(Request* r)
{
size_t cid = r->client_id();
Request_id rid = r->request_id();

int user_id = r->user_id();
auto it = reqs.find({cid, rid});
if (it == reqs.end())
{
Expand All @@ -30,18 +24,7 @@ bool Req_queue::append(Request* r)
auto rn = std::make_unique<RNode>();
rn->r.reset(r);

if (head == nullptr)
{
head = tail = rn.get();
rn->prev = rn->next = nullptr;
}
else
{
tail->next = rn.get();
rn->prev = tail;
rn->next = nullptr;
tail = rn.get();
}
rnodes[user_id].insert_back(rn.get());

reqs.insert({Key{cid, rid}, std::move(rn)});
return true;
Expand All @@ -66,24 +49,29 @@ bool Req_queue::is_in_rqueue(Request* r)

Request* Req_queue::remove()
{
if (head == nullptr)
{
return nullptr;
}

Request* ret = head->r.release();
PBFT_ASSERT(ret != 0, "Invalid state");
uint32_t tcount = enclave::ThreadMessaging::thread_count;
tcount = std::max(tcount, (uint32_t)1);

head = head->next;
if (head != nullptr)
bool found = false;
for (uint32_t i = 0; i < tcount; ++i)
{
head->prev = nullptr;
if (!rnodes[count % tcount].is_empty())
{
found = true;
break;
}
count++;
}
else

if (!found)
{
tail = nullptr;
return nullptr;
}

auto rn = rnodes[count % tcount].pop();
Request* ret = rn->r.release();
PBFT_ASSERT(ret != 0, "Invalid state");

nelems--;
nbytes -= ret->size();

Expand All @@ -108,34 +96,35 @@ bool Req_queue::remove(int cid, Request_id rid)
bool ret = false;
if (rn->prev == nullptr)
{
PBFT_ASSERT(head == rn.get(), "Invalid state");
head = rn->next;
PBFT_ASSERT(
rnodes[rn->r->user_id()].get_head() == rn.get(), "Invalid state");
rnodes[rn->r->user_id()].remove(rn.get());
ret = true;
}
else
{
rn->prev->next = rn->next;
}

if (rn->next == nullptr)
{
PBFT_ASSERT(tail == rn.get(), "Invalid state");
tail = rn->prev;
}
else
{
rn->next->prev = rn->prev;
}

reqs.erase(it);

return ret;
}

void Req_queue::clear()
{
uint32_t tcount = enclave::ThreadMessaging::thread_count;
// There is a corner case when we run the very first transaction that
// thread_count can be 0. The use of std::max is a work around.
tcount = std::max(tcount, (uint32_t)1);
ashamis marked this conversation as resolved.
Show resolved Hide resolved
for (uint32_t i = 0; i < tcount; ++i)
{
while (!rnodes[i].is_empty())
{
rnodes[i].pop();
}
}
reqs.clear();
head = tail = nullptr;
nelems = nbytes = 0;
}

Expand Down
Loading