Skip to content

Commit

Permalink
A bit more threaded_queue work
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobdufault committed May 24, 2018
1 parent bde76bd commit 2e31ddc
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 78 deletions.
48 changes: 22 additions & 26 deletions src/command_line.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,26 +176,28 @@ bool QueryDbMainLoop(QueryDatabase* db,
CodeCompleteCache* non_global_code_complete_cache,
CodeCompleteCache* signature_cache) {
auto* queue = QueueManager::instance();
std::vector<std::unique_ptr<InMessage>> messages =
queue->for_querydb.DequeueAll();
bool did_work = messages.size();
for (auto& message : messages) {
// TODO: Consider using std::unordered_map to lookup the handler
bool did_work = false;

optional<std::unique_ptr<InMessage>> message = queue->for_querydb.TryDequeue(true /*priority*/);
while (message) {
did_work = true;

bool found_handler = false;
for (MessageHandler* handler : *MessageHandler::message_handlers) {
if (handler->GetMethodType() == message->GetMethodType()) {
handler->Run(std::move(message));
if (handler->GetMethodType() == (*message)->GetMethodType()) {
handler->Run(std::move(*message));
found_handler = true;
break;
}
}

if (message) {
LOG_S(FATAL) << "Exiting; no handler for " << message->GetMethodType();
if (!found_handler) {
LOG_S(FATAL) << "Exiting; no handler for " << (*message)->GetMethodType();
exit(1);
}
}

// TODO: consider rate-limiting and checking for IPC messages so we don't
// block requests / we can serve partial requests.
message = queue->for_querydb.TryDequeue(true /*priority*/);
}

if (QueryDb_ImportMain(db, import_manager, status, semantic_cache,
working_files)) {
Expand Down Expand Up @@ -356,23 +358,17 @@ void LaunchStdoutThread(std::unordered_map<MethodType, Timer>* request_times) {
auto* queue = QueueManager::instance();

while (true) {
std::vector<Stdout_Request> messages = queue->for_stdout.DequeueAll();
if (messages.empty()) {
QueueManager::instance()->stdout_waiter->Wait(&queue->for_stdout);
continue;
}
Stdout_Request message = queue->for_stdout.Dequeue();

for (auto& message : messages) {
if (ShouldDisplayMethodTiming(message.method)) {
Timer time = (*request_times)[message.method];
time.ResetAndPrint("[e2e] Running " + std::string(message.method));
}
if (ShouldDisplayMethodTiming(message.method)) {
Timer time = (*request_times)[message.method];
time.ResetAndPrint("[e2e] Running " + std::string(message.method));
}

RecordOutput(message.content);
RecordOutput(message.content);

fwrite(message.content.c_str(), message.content.size(), 1, stdout);
fflush(stdout);
}
fwrite(message.content.c_str(), message.content.size(), 1, stdout);
fflush(stdout);
}
});
}
Expand Down
14 changes: 7 additions & 7 deletions src/import_pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ bool IndexMain_DoParse(
ImportManager* import_manager,
IIndexer* indexer) {
auto* queue = QueueManager::instance();
optional<Index_Request> request = queue->index_request.TryPop(true /*priority*/);
optional<Index_Request> request = queue->index_request.TryDequeue(true /*priority*/);
if (!request)
return false;

Expand All @@ -442,7 +442,7 @@ bool IndexMain_DoCreateIndexUpdate(TimestampManager* timestamp_manager) {
bool did_work = false;
IterationLoop loop;
while (loop.Next()) {
optional<Index_OnIdMapped> response = queue->on_id_mapped.TryPop(true /*priority*/);
optional<Index_OnIdMapped> response = queue->on_id_mapped.TryDequeue(true /*priority*/);
if (!response)
return did_work;

Expand Down Expand Up @@ -483,7 +483,7 @@ bool IndexMain_DoCreateIndexUpdate(TimestampManager* timestamp_manager) {

bool IndexMain_LoadPreviousIndex() {
auto* queue = QueueManager::instance();
optional<Index_DoIdMap> response = queue->load_previous_index.TryPop(true /*priority*/);
optional<Index_DoIdMap> response = queue->load_previous_index.TryDequeue(true /*priority*/);
if (!response)
return false;

Expand All @@ -502,14 +502,14 @@ bool IndexMergeIndexUpdates() {
// by querydb asap.

auto* queue = QueueManager::instance();
optional<Index_OnIndexed> root = queue->on_indexed_for_merge.TryPop(false /*priority*/);
optional<Index_OnIndexed> root = queue->on_indexed_for_merge.TryDequeue(false /*priority*/);
if (!root)
return false;

bool did_merge = false;
IterationLoop loop;
while (loop.Next()) {
optional<Index_OnIndexed> to_join = queue->on_indexed_for_merge.TryPop(false /*priority*/);
optional<Index_OnIndexed> to_join = queue->on_indexed_for_merge.TryDequeue(false /*priority*/);
if (!to_join)
break;
did_merge = true;
Expand Down Expand Up @@ -715,7 +715,7 @@ bool QueryDb_ImportMain(QueryDatabase* db,

IterationLoop loop;
while (loop.Next()) {
optional<Index_DoIdMap> request = queue->do_id_map.TryPop(true /*priority*/);
optional<Index_DoIdMap> request = queue->do_id_map.TryDequeue(true /*priority*/);
if (!request)
break;
did_work = true;
Expand All @@ -724,7 +724,7 @@ bool QueryDb_ImportMain(QueryDatabase* db,

loop.Reset();
while (loop.Next()) {
optional<Index_OnIndexed> response = queue->on_indexed_for_querydb.TryPop(true /*priority*/);
optional<Index_OnIndexed> response = queue->on_indexed_for_querydb.TryDequeue(true /*priority*/);
if (!response)
break;
did_work = true;
Expand Down
2 changes: 1 addition & 1 deletion src/messages/text_document_code_action.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ struct Handler_TextDocumentCodeAction
std::string::npos)
continue;

Maybe<QueryFileId> decl_file_id =
optional<QueryFileId> decl_file_id =
GetDeclarationFileForSymbol(db, db->symbols[i]);
if (!decl_file_id)
continue;
Expand Down
4 changes: 2 additions & 2 deletions src/messages/text_document_code_lens.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ struct Handler_TextDocumentCodeLens
// extent since that is better for outline. This tries to convert the
// extent location to the spelling location.
auto try_ensure_spelling = [&](Use use) {
Maybe<Use> def = GetDefinitionSpell(db, use);
optional<Use> def = GetDefinitionSpell(db, use);
if (!def || def->range.start.line != use.range.start.line) {
return use;
}
Expand Down Expand Up @@ -175,7 +175,7 @@ struct Handler_TextDocumentCodeLens

// "Base"
if (def->bases.size() == 1) {
Maybe<Use> base_loc = GetDefinitionSpell(
optional<Use> base_loc = GetDefinitionSpell(
db, SymbolIdx{def->bases[0], SymbolKind::Func});
if (base_loc) {
optional<lsLocation> ls_base =
Expand Down
6 changes: 3 additions & 3 deletions src/messages/text_document_definition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ std::vector<Use> GetNonDefDeclarationTargets(QueryDatabase* db, SymbolRef sym) {
if (ret.empty()) {
for (auto& def : db->GetVar(sym).def)
if (def.type) {
if (Maybe<Use> use = GetDefinitionSpell(
if (optional<Use> use = GetDefinitionSpell(
db, SymbolIdx{*def.type, SymbolKind::Type})) {
ret.push_back(*use);
break;
Expand Down Expand Up @@ -141,7 +141,7 @@ struct Handler_TextDocumentDefinition
auto pos = name.rfind(short_query);
if (pos == std::string::npos)
continue;
if (Maybe<Use> use = GetDefinitionSpell(db, db->symbols[i])) {
if (optional<Use> use = GetDefinitionSpell(db, db->symbols[i])) {
std::tuple<int, int, bool, int> score{
int(name.size() - short_query.size()), -int(pos),
use->file != file_id,
Expand All @@ -160,7 +160,7 @@ struct Handler_TextDocumentDefinition
}
}
if (best_i != -1) {
Maybe<Use> use = GetDefinitionSpell(db, db->symbols[best_i]);
optional<Use> use = GetDefinitionSpell(db, db->symbols[best_i]);
assert(use);
if (auto ls_loc = GetLsLocation(db, working_files, *use))
out.result.push_back(*ls_loc);
Expand Down
2 changes: 1 addition & 1 deletion src/messages/workspace_symbol.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ bool InsertSymbolIntoResult(QueryDatabase* db,
if (!info)
return false;

Maybe<Use> location = GetDefinitionExtent(db, symbol);
optional<Use> location = GetDefinitionExtent(db, symbol);
Use loc;
if (location)
loc = *location;
Expand Down
8 changes: 4 additions & 4 deletions src/query_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ std::vector<Use> GetDeclarations(std::vector<Q>& entities,

} // namespace

Maybe<Use> GetDefinitionSpell(QueryDatabase* db, SymbolIdx sym) {
Maybe<Use> ret;
optional<Use> GetDefinitionSpell(QueryDatabase* db, SymbolIdx sym) {
optional<Use> ret;
EachEntityDef(db, sym, [&](const auto& def) { return !(ret = def.spell); });
return ret;
}

Maybe<Use> GetDefinitionExtent(QueryDatabase* db, SymbolIdx sym) {
optional<Use> GetDefinitionExtent(QueryDatabase* db, SymbolIdx sym) {
// Used to jump to file.
if (sym.kind == SymbolKind::File)
return Use(Range(Position(0, 0), Position(0, 0)), sym.id, sym.kind,
Expand All @@ -54,7 +54,7 @@ Maybe<Use> GetDefinitionExtent(QueryDatabase* db, SymbolIdx sym) {
return ret;
}

Maybe<QueryFileId> GetDeclarationFileForSymbol(QueryDatabase* db,
optional<QueryFileId> GetDeclarationFileForSymbol(QueryDatabase* db,
SymbolIdx sym) {
switch (sym.kind) {
case SymbolKind::File:
Expand Down
6 changes: 3 additions & 3 deletions src/query_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

#include <optional.h>

Maybe<Use> GetDefinitionSpell(QueryDatabase* db, SymbolIdx sym);
Maybe<Use> GetDefinitionExtent(QueryDatabase* db, SymbolIdx sym);
Maybe<QueryFileId> GetDeclarationFileForSymbol(QueryDatabase* db,
optional<Use> GetDefinitionSpell(QueryDatabase* db, SymbolIdx sym);
optional<Use> GetDefinitionExtent(QueryDatabase* db, SymbolIdx sym);
optional<QueryFileId> GetDeclarationFileForSymbol(QueryDatabase* db,
SymbolIdx sym);

// Get defining declaration (if exists) or an arbitrary declaration (otherwise)
Expand Down
42 changes: 11 additions & 31 deletions src/threaded_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <deque>
#include <memory>
#include <mutex>
#include <vector>
#include <tuple>
#include <utility>

Expand Down Expand Up @@ -120,26 +120,6 @@ struct ThreadedQueue : public BaseThreadQueue {
waiter->cv.notify_all();
}

// Return all elements in the queue.
std::vector<T> DequeueAll() {
std::lock_guard<std::mutex> lock(mutex);

total_count_ = 0;

std::vector<T> result;
result.reserve(priority_.size() + queue_.size());
while (!priority_.empty()) {
result.emplace_back(std::move(priority_.front()));
priority_.pop_front();
}
while (!queue_.empty()) {
result.emplace_back(std::move(queue_.front()));
queue_.pop_front();
}

return result;
}

// Returns true if the queue is empty. This is lock-free.
bool IsEmpty() {
return total_count_ == 0;
Expand All @@ -151,9 +131,9 @@ struct ThreadedQueue : public BaseThreadQueue {
waiter->cv.wait(lock,
[&]() { return !priority_.empty() || !queue_.empty(); });

auto execute = [&](std::deque<T>* q) {
auto val = std::move(q->front());
q->pop_front();
auto execute = [&](std::vector<T>* q) {
auto val = std::move(q->back());
q->pop_back();
--total_count_;
return std::move(val);
};
Expand All @@ -164,17 +144,17 @@ struct ThreadedQueue : public BaseThreadQueue {

// Get the first element from the queue without blocking. Returns a null
// value if the queue is empty.
optional<T> TryPop(bool priority) {
optional<T> TryDequeue(bool priority) {
std::lock_guard<std::mutex> lock(mutex);

auto pop = [&](std::deque<T>* q) {
auto val = std::move(q->front());
q->pop_front();
auto pop = [&](std::vector<T>* q) {
auto val = std::move(q->back());
q->pop_back();
--total_count_;
return std::move(val);
};

auto get_result = [&](std::deque<T>* first, std::deque<T>* second) -> optional<T> {
auto get_result = [&](std::vector<T>* first, std::vector<T>* second) -> optional<T> {
if (!first->empty())
return pop(first);
if (!second->empty())
Expand All @@ -200,6 +180,6 @@ struct ThreadedQueue : public BaseThreadQueue {

private:
std::atomic<int> total_count_;
std::deque<T> priority_;
std::deque<T> queue_;
std::vector<T> priority_;
std::vector<T> queue_;
};

0 comments on commit 2e31ddc

Please sign in to comment.