Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port from OpenMP to TBB #1028

Merged
merged 9 commits into from
May 22, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ install:
- sudo apt-add-repository -y ppa:ubuntu-toolchain-r/test
- sudo add-apt-repository -y ppa:boost-latest/ppa
- sudo apt-get update >/dev/null
- sudo apt-get -q install libprotoc-dev libprotobuf7 libprotobuf-dev libosmpbf-dev libbz2-dev libstxxl-dev libstxxl1 libxml2-dev libzip-dev lua5.1 liblua5.1-0-dev rubygems
- sudo apt-get -q install libprotoc-dev libprotobuf7 libprotobuf-dev libosmpbf-dev libbz2-dev libstxxl-dev libstxxl1 libxml2-dev libzip-dev lua5.1 liblua5.1-0-dev rubygems libtbb-dev
- sudo apt-get -q install g++-4.7
- sudo apt-get install libboost1.54-all-dev
#luabind
Expand Down
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ target_link_libraries(osrm-datastore ${Boost_LIBRARIES} UUID GITDESCRIPTION COOR
find_package(Threads REQUIRED)
target_link_libraries(osrm-extract ${CMAKE_THREAD_LIBS_INIT})

find_package(TBB REQUIRED)
target_link_libraries(osrm-extract ${TBB_LIBRARIES})
target_link_libraries(osrm-prepare ${TBB_LIBRARIES})
include_directories(${TBB_INCLUDE_DIR})

find_package(Lua52)
if(NOT LUA52_FOUND)
find_package(Lua51 REQUIRED)
Expand Down
198 changes: 119 additions & 79 deletions Contractor/Contractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "../Util/OpenMPWrapper.h"
#include "../Util/SimpleLogger.h"
#include "../Util/StringUtil.h"
#include "../Util/TimingUtil.h"

#include <boost/assert.hpp>

#include <tbb/enumerable_thread_specific.h>
#include <tbb/parallel_for.h>

#include <algorithm>
#include <limits>
#include <vector>
Expand Down Expand Up @@ -125,6 +129,28 @@ class Contractor
bool is_independent : 1;
};


struct ThreadDataContainer
{
ThreadDataContainer(int number_of_nodes) : number_of_nodes(number_of_nodes) {}

inline ContractorThreadData* getThreadData()
{
bool exists = false;
auto& ref = data.local(exists);
if (!exists)
{
ref = std::make_shared<ContractorThreadData>(number_of_nodes);
}

return ref.get();
}

int number_of_nodes;
typedef tbb::enumerable_thread_specific<std::shared_ptr<ContractorThreadData>> EnumerableThreadData;
EnumerableThreadData data;
};

public:
template <class ContainerT> Contractor(int nodes, ContainerT &input_edge_list)
{
Expand Down Expand Up @@ -262,39 +288,51 @@ class Contractor

void Run()
{
// for the preperation we can use a big grain size, which is much faster (probably cache)
constexpr size_t InitGrainSize = 100000;
constexpr size_t PQGrainSize = 100000;
// auto_partitioner will automatically increase the blocksize if we have
// a lot of data. It is *important* for the last loop iterations
// (which have a very small dataset) that it is devisible.
constexpr size_t IndependentGrainSize = 1;
constexpr size_t ContractGrainSize = 1;
constexpr size_t NeighboursGrainSize = 1;
constexpr size_t DeleteGrainSize = 1;

const NodeID number_of_nodes = contractor_graph->GetNumberOfNodes();
Percent p(number_of_nodes);

const unsigned thread_count = omp_get_max_threads();
std::vector<ContractorThreadData *> thread_data_list;
for (unsigned thread_id = 0; thread_id < thread_count; ++thread_id)
{
thread_data_list.push_back(new ContractorThreadData(number_of_nodes));
}
std::cout << "Contractor is using " << thread_count << " threads" << std::endl;
ThreadDataContainer thread_data_list(number_of_nodes);

NodeID number_of_contracted_nodes = 0;
std::vector<RemainingNodeData> remaining_nodes(number_of_nodes);
std::vector<float> node_priorities(number_of_nodes);
std::vector<NodePriorityData> node_data(number_of_nodes);

// initialize priorities in parallel
#pragma omp parallel for schedule(guided)
for (int x = 0; x < (int)number_of_nodes; ++x)
{
remaining_nodes[x].id = x;
}

// initialize priorities in parallel
tbb::parallel_for(tbb::blocked_range<int>(0, number_of_nodes, InitGrainSize),
[&remaining_nodes](const tbb::blocked_range<int>& range)
{
for (int x = range.begin(); x != range.end(); ++x)
{
remaining_nodes[x].id = x;
}
}
);


std::cout << "initializing elimination PQ ..." << std::flush;
#pragma omp parallel
{
ContractorThreadData *data = thread_data_list[omp_get_thread_num()];
#pragma omp parallel for schedule(guided)
for (int x = 0; x < (int)number_of_nodes; ++x)
tbb::parallel_for(tbb::blocked_range<int>(0, number_of_nodes, PQGrainSize),
[this, &node_priorities, &node_data, &thread_data_list](const tbb::blocked_range<int>& range)
{
node_priorities[x] = EvaluateNodePriority(data, &node_data[x], x);
ContractorThreadData *data = thread_data_list.getThreadData();
for (int x = range.begin(); x != range.end(); ++x)
{
node_priorities[x] = this->EvaluateNodePriority(data, &node_data[x], x);
}
}
}
);
std::cout << "ok" << std::endl << "preprocessing " << number_of_nodes << " nodes ..."
<< std::flush;

Expand All @@ -309,11 +347,7 @@ class Contractor
std::cout << " [flush " << number_of_contracted_nodes << " nodes] " << std::flush;

// Delete old heap data to free memory that we need for the coming operations
for (ContractorThreadData *data : thread_data_list)
{
delete data;
}
thread_data_list.clear();
thread_data_list.data.clear();

// Create new priority array
std::vector<float> new_node_priority(remaining_nodes.size());
Expand Down Expand Up @@ -396,61 +430,69 @@ class Contractor

// INFO: MAKE SURE THIS IS THE LAST OPERATION OF THE FLUSH!
// reinitialize heaps and ThreadData objects with appropriate size
for (unsigned thread_id = 0; thread_id < thread_count; ++thread_id)
{
thread_data_list.push_back(
new ContractorThreadData(contractor_graph->GetNumberOfNodes()));
}
thread_data_list.number_of_nodes = contractor_graph->GetNumberOfNodes();
}

const int last = (int)remaining_nodes.size();
#pragma omp parallel
{
// determine independent node set
ContractorThreadData *const data = thread_data_list[omp_get_thread_num()];
#pragma omp for schedule(guided)
for (int i = 0; i < last; ++i)
tbb::parallel_for(tbb::blocked_range<int>(0, last, IndependentGrainSize),
[this, &node_priorities, &remaining_nodes, &thread_data_list](const tbb::blocked_range<int>& range)
{
const NodeID node = remaining_nodes[i].id;
remaining_nodes[i].is_independent =
IsNodeIndependent(node_priorities, data, node);
ContractorThreadData *data = thread_data_list.getThreadData();
// determine independent node set
for (int i = range.begin(); i != range.end(); ++i)
{
const NodeID node = remaining_nodes[i].id;
remaining_nodes[i].is_independent =
this->IsNodeIndependent(node_priorities, data, node);
}
}
}
);

const auto first = stable_partition(remaining_nodes.begin(),
remaining_nodes.end(),
[](RemainingNodeData node_data)
{ return !node_data.is_independent; });
const int first_independent_node = first - remaining_nodes.begin();
// contract independent nodes
#pragma omp parallel
{
ContractorThreadData *data = thread_data_list[omp_get_thread_num()];
#pragma omp for schedule(guided) nowait
for (int position = first_independent_node; position < last; ++position)

// contract independent nodes
tbb::parallel_for(tbb::blocked_range<int>(first_independent_node, last, ContractGrainSize),
[this, &remaining_nodes, &thread_data_list](const tbb::blocked_range<int>& range)
{
NodeID x = remaining_nodes[position].id;
ContractNode<false>(data, x);
ContractorThreadData *data = thread_data_list.getThreadData();
for (int position = range.begin(); position != range.end(); ++position)
{
const NodeID x = remaining_nodes[position].id;
this->ContractNode<false>(data, x);
}
}

std::sort(data->inserted_edges.begin(), data->inserted_edges.end());
}
#pragma omp parallel
{
ContractorThreadData *data = thread_data_list[omp_get_thread_num()];
#pragma omp for schedule(guided) nowait
for (int position = first_independent_node; position < last; ++position)
);
// make sure we really sort each block
tbb::parallel_for(thread_data_list.data.range(),
[&](const ThreadDataContainer::EnumerableThreadData::range_type& range)
{
NodeID x = remaining_nodes[position].id;
DeleteIncomingEdges(data, x);
for (auto& data : range)
std::sort(data->inserted_edges.begin(),
data->inserted_edges.end());
}
}
);
tbb::parallel_for(tbb::blocked_range<int>(first_independent_node, last, DeleteGrainSize),
[this, &remaining_nodes, &thread_data_list](const tbb::blocked_range<int>& range)
{
ContractorThreadData *data = thread_data_list.getThreadData();
for (int position = range.begin(); position != range.end(); ++position)
{
const NodeID x = remaining_nodes[position].id;
this->DeleteIncomingEdges(data, x);
}
}
);

// insert new edges
for (unsigned thread_id = 0; thread_id < thread_count; ++thread_id)
for (auto& data : thread_data_list.data)
{
ContractorThreadData &data = *thread_data_list[thread_id];
for (const ContractorEdge &edge : data.inserted_edges)
for (const ContractorEdge &edge : data->inserted_edges)
{
auto current_edge_ID = contractor_graph->FindEdge(edge.source, edge.target);
const EdgeID current_edge_ID = contractor_graph->FindEdge(edge.source, edge.target);
if (current_edge_ID < contractor_graph->EndEdges(edge.source))
{
ContractorGraph::EdgeData &current_data =
Expand All @@ -466,19 +508,21 @@ class Contractor
}
contractor_graph->InsertEdge(edge.source, edge.target, edge.data);
}
data.inserted_edges.clear();
data->inserted_edges.clear();
}
// update priorities
#pragma omp parallel
{
ContractorThreadData *data = thread_data_list[omp_get_thread_num()];
#pragma omp for schedule(guided) nowait
for (int position = first_independent_node; position < last; ++position)

tbb::parallel_for(tbb::blocked_range<int>(first_independent_node, last, NeighboursGrainSize),
[this, &remaining_nodes, &node_priorities, &node_data, &thread_data_list](const tbb::blocked_range<int>& range)
{
NodeID x = remaining_nodes[position].id;
UpdateNodeNeighbours(node_priorities, node_data, data, x);
ContractorThreadData *data = thread_data_list.getThreadData();
for (int position = range.begin(); position != range.end(); ++position)
{
NodeID x = remaining_nodes[position].id;
this->UpdateNodeNeighbours(node_priorities, node_data, data, x);
}
}
}
);

// remove contracted nodes from the pool
number_of_contracted_nodes += last - first_independent_node;
remaining_nodes.resize(first_independent_node);
Expand Down Expand Up @@ -510,11 +554,8 @@ class Contractor

p.printStatus(number_of_contracted_nodes);
}
for (ContractorThreadData *data : thread_data_list)
{
delete data;
}
thread_data_list.clear();

thread_data_list.data.clear();
}

template <class Edge> inline void GetEdges(DeallocatingVector<Edge> &edges)
Expand Down Expand Up @@ -769,7 +810,6 @@ class Contractor
true,
true,
false);
;
inserted_edges.push_back(new_edge);
std::swap(new_edge.source, new_edge.target);
new_edge.data.forward = false;
Expand Down
3 changes: 1 addition & 2 deletions Contractor/TemporaryStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "TemporaryStorage.h"

StreamData::StreamData()
: write_mode(true), temp_path(boost::filesystem::unique_path(temp_directory.append(
TemporaryFilePattern.begin(), TemporaryFilePattern.end()))),
: write_mode(true), temp_path(boost::filesystem::unique_path(temp_directory / TemporaryFilePattern)),
temp_file(new boost::filesystem::fstream(
temp_path, std::ios::in | std::ios::out | std::ios::trunc | std::ios::binary)),
readWriteMutex(std::make_shared<boost::mutex>())
Expand Down
5 changes: 2 additions & 3 deletions DataStructures/DynamicGraph.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <algorithm>
#include <limits>
#include <vector>
#include <atomic>

template <typename EdgeDataT> class DynamicGraph
{
Expand Down Expand Up @@ -198,7 +199,6 @@ template <typename EdgeDataT> class DynamicGraph
void DeleteEdge(const NodeIterator source, const EdgeIterator e)
{
Node &node = m_nodes[source];
#pragma omp atomic
--m_numEdges;
--node.edges;
BOOST_ASSERT(std::numeric_limits<unsigned>::max() != node.edges);
Expand Down Expand Up @@ -226,7 +226,6 @@ template <typename EdgeDataT> class DynamicGraph
}
}

#pragma omp atomic
m_numEdges -= deleted;
m_nodes[source].edges -= deleted;

Expand Down Expand Up @@ -272,7 +271,7 @@ template <typename EdgeDataT> class DynamicGraph
};

NodeIterator m_numNodes;
EdgeIterator m_numEdges;
std::atomic_uint m_numEdges;

std::vector<Node> m_nodes;
DeallocatingVector<Edge> m_edges;
Expand Down
5 changes: 2 additions & 3 deletions DataStructures/Percent.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

#include "../Util/OpenMPWrapper.h"
#include <iostream>
#include <atomic>

class Percent
{
Expand Down Expand Up @@ -61,20 +62,18 @@ class Percent

void printIncrement()
{
#pragma omp atomic
++m_current_value;
printStatus(m_current_value);
}

void printAddition(const unsigned addition)
{
#pragma omp atomic
m_current_value += addition;
printStatus(m_current_value);
}

private:
unsigned m_current_value;
std::atomic_uint m_current_value;
unsigned m_max_value;
unsigned m_percent_interval;
unsigned m_next_threshold;
Expand Down
Loading