Skip to content

Commit

Permalink
Implement fragment, mr and pipeline on GPU
Browse files Browse the repository at this point in the history
  • Loading branch information
zanmato1984 committed Jan 15, 2021
1 parent be344a9 commit f189612
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 5 deletions.
13 changes: 13 additions & 0 deletions include/cura/data/fragment.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

#include <algorithm>

#ifdef USE_CUDF
#include <cudf/table/table.hpp>
#include <cudf/table/table_view.hpp>
#endif

namespace cura::data {

struct ColumnScalar;
Expand All @@ -15,6 +20,10 @@ using cura::type::Schema;
struct Fragment {
explicit Fragment(std::vector<std::shared_ptr<const Column>> columns_);

#ifdef USE_CUDF
explicit Fragment(const Schema &schema, std::unique_ptr<cudf::table> &&table);
#endif

explicit Fragment(std::shared_ptr<arrow::RecordBatch> record_batch);

size_t size() const;
Expand All @@ -41,6 +50,10 @@ struct Fragment {
auto end() { return std::end(columns); }
auto end() const { return std::cend(columns); }

#ifdef USE_CUDF
cudf::table_view cudf() const;
#endif

std::shared_ptr<arrow::RecordBatch> arrow() const;

private:
Expand Down
108 changes: 103 additions & 5 deletions src/execution/memory_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,57 @@

#include <string>

#ifdef USE_CUDF
#include <rmm/mr/device/arena_memory_resource.hpp>
#include <rmm/mr/device/cuda_memory_resource.hpp>
#include <rmm/mr/device/managed_memory_resource.hpp>
#include <rmm/mr/device/per_device_resource.hpp>
#include <rmm/mr/device/pool_memory_resource.hpp>
#else
#include <arrow/memory_pool.h>
#endif

namespace cura::execution {

namespace detail {

template <typename T> struct LightWeightMemoryResource : public MemoryResource {
explicit LightWeightMemoryResource(const Option &option) {
#ifdef USE_CUDF
template <typename PT, template <typename> typename LT>
#else
template <typename T>
#endif
struct LightWeightMemoryResource : public MemoryResource {
explicit LightWeightMemoryResource(const Option &option)
#ifdef USE_CUDF
: orig_default(nullptr) {
#else
{
#endif
#ifdef USE_CUDF
p = std::make_unique<PT>();
if (option.memory_resource_size) {
l = std::make_unique<LT<PT>>(p.get(), option.memory_resource_size,
option.memory_resource_size);
} else {
l = std::make_unique<LT<PT>>(p.get());
}
CURA_ASSERT(l, "LightWeightMemoryResource allocation failed");

if (option.exclusive_default_memory_resource) {
orig_default = rmm::mr::set_current_device_resource(l.get());
}
#else
l = arrow::MemoryPool::CreateDefault();
#endif
}

~LightWeightMemoryResource() {}
~LightWeightMemoryResource() {
#ifdef USE_CUDF
if (orig_default) {
rmm::mr::set_current_device_resource(orig_default);
}
#endif
}

Underlying *preConcatenate(ThreadId thread_id) const override {
return l.get();
Expand All @@ -39,13 +78,28 @@ template <typename T> struct LightWeightMemoryResource : public MemoryResource {
void reclaimConverge() override {}

private:
#ifdef USE_CUDF
std::unique_ptr<PT> p;
std::unique_ptr<LT<PT>> l;
Underlying *orig_default;
#else
std::unique_ptr<T> l;
#endif
};

#ifdef USE_CUDF
template <typename PT, template <typename> typename LT>
#else
template <typename T>
#endif
struct LightWeightPerThreadMemoryResource : public MemoryResource {
LightWeightPerThreadMemoryResource(const Option &option)
: thread_ls(option.threads_per_pipeline) {
:
#ifdef USE_CUDF
thread_ps(option.threads_per_pipeline), orig_default(nullptr),
#endif
thread_ls(option.threads_per_pipeline) {
#ifdef USE_CUDF
CURA_ASSERT(option.memory_resource_size,
"LightWeightPerThreadMemoryResource size must not be zero");
CURA_ASSERT(
Expand All @@ -71,9 +125,21 @@ struct LightWeightPerThreadMemoryResource : public MemoryResource {
CURA_ASSERT(thread_ls[i], "LightWeightPerThreadMemoryResource "
"per thread allocation failed");
}
#else
l = arrow::MemoryPool::CreateDefault();
for (size_t i = 0; i < option.threads_per_pipeline; i++) {
thread_ls[i] = arrow::MemoryPool::CreateDefault();
}
#endif
}

~LightWeightPerThreadMemoryResource() {}
~LightWeightPerThreadMemoryResource() {
#ifdef USE_CUDF
if (orig_default) {
rmm::mr::set_current_device_resource(orig_default);
}
#endif
}

Underlying *preConcatenate(ThreadId thread_id) const override {
CURA_ASSERT(thread_id < thread_ls.size(),
Expand Down Expand Up @@ -101,13 +167,25 @@ struct LightWeightPerThreadMemoryResource : public MemoryResource {
void reclaimConverge() override {}

private:
#ifdef USE_CUDF
std::unique_ptr<PT> p;
std::vector<std::unique_ptr<PT>> thread_ps;
std::unique_ptr<LT<PT>> l;
std::vector<std::unique_ptr<LT<PT>>> thread_ls;
Underlying *orig_default;
#else
std::unique_ptr<T> l;
std::vector<std::unique_ptr<T>> thread_ls;
#endif
};

template <typename T> struct PrimitiveMemoryResource : public MemoryResource {
explicit PrimitiveMemoryResource(const Option &option) {
#ifdef USE_CUDF
mr = std::make_unique<T>();
#else
mr = arrow::MemoryPool::CreateDefault();
#endif
}

Underlying *preConcatenate(ThreadId thread_id) const override {
Expand Down Expand Up @@ -135,6 +213,25 @@ template <typename T> struct PrimitiveMemoryResource : public MemoryResource {
std::unique_ptr<T> mr;
};

#ifdef USE_CUDF
using ManagedUnderlying = rmm::mr::managed_memory_resource;
using CudaUnderlying = rmm::mr::cuda_memory_resource;
using LightWeightPrimitiveUnderlying = CudaUnderlying;
template <typename T> using ArenaUnderlying = rmm::mr::arena_memory_resource<T>;
template <typename T> using PoolUnderlying = rmm::mr::pool_memory_resource<T>;
using ArenaMemoryResource =
LightWeightMemoryResource<LightWeightPrimitiveUnderlying, ArenaUnderlying>;
using ArenaPerThreadMemoryResource =
LightWeightPerThreadMemoryResource<LightWeightPrimitiveUnderlying,
ArenaUnderlying>;
using PoolMemoryResource =
LightWeightMemoryResource<LightWeightPrimitiveUnderlying, PoolUnderlying>;
using PoolPerThreadMemoryResource =
LightWeightPerThreadMemoryResource<LightWeightPrimitiveUnderlying,
PoolUnderlying>;
using ManagedMemoryResource = PrimitiveMemoryResource<ManagedUnderlying>;
using CudaMemoryResource = PrimitiveMemoryResource<CudaUnderlying>;
#else
using ArenaMemoryResource = LightWeightMemoryResource<arrow::MemoryPool>;
using ArenaPerThreadMemoryResource =
LightWeightPerThreadMemoryResource<arrow::MemoryPool>;
Expand All @@ -143,6 +240,7 @@ using PoolPerThreadMemoryResource =
LightWeightPerThreadMemoryResource<arrow::MemoryPool>;
using ManagedMemoryResource = PrimitiveMemoryResource<arrow::MemoryPool>;
using CudaMemoryResource = PrimitiveMemoryResource<arrow::MemoryPool>;
#endif

} // namespace detail

Expand Down
64 changes: 64 additions & 0 deletions src/execution/pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
#include <sstream>
#include <string>

#ifdef USE_CUDF
#include <cudf/interop.hpp>
#endif

namespace cura::execution {

namespace detail {
Expand Down Expand Up @@ -163,6 +167,54 @@ void stringifyTreeHorizontal(
line_map.emplace(root, first_line);
}

#ifdef USE_CUDF
std::shared_ptr<Fragment> copyToCudf(const Context &ctx, ThreadId thread_id,
std::shared_ptr<const Fragment> fragment) {
if (!fragment) {
return nullptr;
}

auto rb = fragment->arrow();
Schema fragment_schema;
{
for (size_t i = 0; i < rb->num_columns(); i++) {
const auto &field = rb->schema()->field(i);
fragment_schema.emplace_back(field->type(), field->nullable());
}
}
const auto &arrow_table =
CURA_GET_ARROW_RESULT(arrow::Table::FromRecordBatches({rb}));
auto cudf_table = cudf::from_arrow(
*arrow_table, ctx.memory_resource->preConcatenate(thread_id));
return std::make_shared<Fragment>(fragment_schema, std::move(cudf_table));
}

std::shared_ptr<Fragment>
copyToArrow(std::shared_ptr<const Fragment> fragment) {
if (!fragment) {
return nullptr;
}

auto tv = fragment->cudf();
std::vector<cudf::column_metadata> metas(tv.num_columns());
auto arrow_table = cudf::to_arrow(tv, metas);
std::vector<std::shared_ptr<arrow::Array>> arrow_cols(
arrow_table->num_columns());
{
const auto &arrow_chunks = arrow_table->columns();
std::transform(arrow_chunks.begin(), arrow_chunks.end(), arrow_cols.begin(),
[](const auto &chunk) {
CURA_ASSERT(chunk->num_chunks() == 1,
"Invalid arrow table from cudf");
return chunk->chunk(0);
});
}
auto rb = arrow::RecordBatch::Make(
arrow_table->schema(), arrow_table->num_rows(), std::move(arrow_cols));
return std::make_shared<Fragment>(rb);
}
#endif

} // namespace detail

Pipeline::Pipeline(PipelineId id_, bool is_final_,
Expand Down Expand Up @@ -196,7 +248,12 @@ void Pipeline::push(const Context &ctx, ThreadId thread_id, SourceId source_id,
CURA_ASSERT(!fragment,
"push an heap source with not-null fragment is not allowed");
}
#ifdef USE_CUDF
auto cudf = detail::copyToCudf(ctx, thread_id, fragment);
getSource(source_id)->push(ctx, thread_id, VoidKernelId, cudf);
#else
getSource(source_id)->push(ctx, thread_id, VoidKernelId, fragment);
#endif
}

std::shared_ptr<const Fragment>
Expand All @@ -207,8 +264,15 @@ Pipeline::stream(const Context &ctx, ThreadId thread_id, SourceId source_id,
CURA_ASSERT(!fragment,
"stream an heap source with not-null fragment is not allowed");
}
#ifdef USE_CUDF
auto cudf = detail::copyToCudf(ctx, thread_id, fragment);
auto ret =
getSource(source_id)->stream(ctx, thread_id, VoidKernelId, cudf, rows);
return detail::copyToArrow(ret);
#else
return getSource(source_id)->stream(ctx, thread_id, VoidKernelId, fragment,
rows);
#endif
}

std::string Pipeline::toString() const {
Expand Down

0 comments on commit f189612

Please sign in to comment.