Skip to content

Commit

Permalink
Discard temporary scratch data by setting the flow read-only
Browse files Browse the repository at this point in the history
Set the flow to read-only at the end of the task for flows
that are marked as temporary. This allows PaRSEC to put the data
into a read-only queue that does not need to be pushed back to the host.
Temporary data does not need to be evicted, only released.

Signed-off-by: Joseph Schuchart <joseph.schuchart@stonybrook.edu>
  • Loading branch information
devreal committed Oct 16, 2024
1 parent 9535f7f commit ea6f47b
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 15 deletions.
18 changes: 12 additions & 6 deletions ttg/ttg/device/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace ttg::device {
impl_data_t impl_data;
ttg::scope scope;
bool is_const;
bool is_scratch;
};

template <typename... Ts>
Expand All @@ -31,10 +32,11 @@ namespace ttg::device {

/* extract buffer information from to_device_t */
template<typename... Ts, std::size_t... Is>
auto extract_buffer_data(detail::to_device_t<Ts...>& a) {
return std::array<device_input_data_t, sizeof...(Is)>{
auto extract_buffer_data(detail::to_device_t<Ts...>& a, std::index_sequence<Is...>) {
return std::array{
{TTG_IMPL_NS::buffer_data(std::get<Is>(a.ties)),
std::get<Is>(a.ties).scope(), std::get<Is>(a.ties)}...};
std::get<Is>(a.ties).scope(), std::is_const_v<std::tuple_element<Is, decltype(a.ties)>>,
ttg::meta::is_devicescratch_v<std::tuple_element<Is, decltype(a.ties)>>}...};
}
} // namespace detail

Expand All @@ -46,12 +48,16 @@ namespace ttg::device {
Input() { }
template<typename... Args>
Input(Args&&... args)
: m_data{{TTG_IMPL_NS::buffer_data(args), args.scope(), std::is_const_v<Args>}...}
: m_data{{TTG_IMPL_NS::buffer_data(args), args.scope(),
std::is_const_v<std::decay_t<Args>>,
ttg::meta::is_devicescratch_v<Args>}...}
{ }

template<typename T>
void add(T&& v) {
m_data.emplace_back(TTG_IMPL_NS::buffer_data(v), v.scope(), std::is_const_v<T>);
using type = std::decay_t<T>;
m_data.emplace_back(TTG_IMPL_NS::buffer_data(v), v.scope(), std::is_const_v<type>,
ttg::meta::is_devicescratch_v<type>);
}

ttg::span<detail::device_input_data_t> span() {
Expand Down Expand Up @@ -610,7 +616,7 @@ namespace ttg::device {

template<typename... Ts>
ttg::suspend_always await_transform(detail::to_device_t<Ts...>&& a) {
auto arr = detail::extract_buffer_data(a);
auto arr = detail::extract_buffer_data(a, std::make_index_sequence<sizeof...(Ts)>{});
bool need_transfer = !(TTG_IMPL_NS::register_device_memory(ttg::span(arr)));
/* TODO: are we allowed to not suspend here and launch the kernel directly? */
m_state = ttg::device::detail::TTG_DEVICE_CORO_WAIT_TRANSFER;
Expand Down
9 changes: 7 additions & 2 deletions ttg/ttg/parsec/devicefunc.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ namespace ttg_parsec {
for (i = 0; i < span.size(); ++i) {
/* get_parsec_data is overloaded for buffer and devicescratch */
parsec_data_t* data = span[i].impl_data;
/* TODO: check whether the device is current */
bool is_const = span[i].is_const;
ttg::scope scope = span[i].scope;
bool is_const = span[i].is_const;
bool is_scratch = span[i].is_scratch;

if (nullptr != data) {
auto access = PARSEC_FLOW_ACCESS_RW;
Expand All @@ -139,6 +139,11 @@ namespace ttg_parsec {
access = PARSEC_FLOW_ACCESS_READ;
}

if (is_scratch) {
/* mark the flow as temporary so we can discard it easily */
access |= TTG_PARSEC_FLOW_ACCESS_TMP;
}

/* build the flow */
/* TODO: reuse the flows of the task class? How can we control the sync direction then? */
flows[i] = parsec_flow_t{.name = nullptr,
Expand Down
3 changes: 3 additions & 0 deletions ttg/ttg/parsec/parsec-ext.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@
/* HACK: we need this flag on a data copy to indicate whether it has been registered */
#define TTG_PARSEC_DATA_FLAG_REGISTERED ((parsec_data_flag_t)1<<2)

/* HACK: mark the flows of device scratch as temporary so that we can easily discard it */
#define TTG_PARSEC_FLOW_ACCESS_TMP (1<<7)

#endif // TTG_PARSEC_EXT_H
21 changes: 14 additions & 7 deletions ttg/ttg/parsec/ttg.h
Original file line number Diff line number Diff line change
Expand Up @@ -1419,30 +1419,35 @@ namespace ttg_parsec {
int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
ttg::device::detail::set_current(device, cuda_stream->cuda_stream);
}
#endif // defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)

#if defined(PARSEC_HAVE_DEV_HIP_SUPPORT) && defined(TTG_HAVE_HIP)
#elif defined(PARSEC_HAVE_DEV_HIP_SUPPORT) && defined(TTG_HAVE_HIP)
{
parsec_hip_exec_stream_t *hip_stream = (parsec_hip_exec_stream_t *)gpu_stream;
int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
ttg::device::detail::set_current(device, hip_stream->hip_stream);
}
#endif // defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)

#if defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) && defined(TTG_HAVE_LEVEL_ZERO)
#elif defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) && defined(TTG_HAVE_LEVEL_ZERO)
{
parsec_level_zero_exec_stream_t *stream;
stream = (parsec_level_zero_exec_stream_t *)gpu_stream;
int device = detail::parsec_device_to_ttg_device(gpu_device->super.device_index);
ttg::device::detail::set_current(device, stream->swq->queue);
}
#endif // defined(PARSEC_HAVE_DEV_CUDA_SUPPORT) && defined(TTG_HAVE_CUDA)
#endif // defined(PARSEC_HAVE_DEV_LEVEL_ZERO_SUPPORT) && defined(TTG_HAVE_LEVEL_ZERO)

/* Here we call back into the coroutine again after the transfers have completed */
static_op<Space>(&task->parsec_task);

ttg::device::detail::reset_current();

auto discard_tmp_flows = [&](){
for (int i = 0; i < MAX_PARAM_COUNT; ++i) {
if (gpu_task->flow[i]->flow_flags & TTG_PARSEC_FLOW_ACCESS_TMP) {
/* temporary flow, discard by setting it to read-only to avoid evictions */
const_cast<parsec_flow_t*>(gpu_task->flow[i])->flow_flags = PARSEC_FLOW_ACCESS_READ;
}
}
};

/* we will come back into this function once the kernel and transfers are done */
int rc = PARSEC_HOOK_RETURN_DONE;
if (nullptr != task->suspended_task_address) {
Expand All @@ -1458,13 +1463,15 @@ namespace ttg_parsec {
ttg::device::detail::TTG_DEVICE_CORO_COMPLETE == dev_data.state()) {
/* the task started sending so we won't come back here */
//std::cout << "device_static_submit task " << task << " complete" << std::endl;
discard_tmp_flows();
} else {
//std::cout << "device_static_submit task " << task << " return-again" << std::endl;
rc = PARSEC_HOOK_RETURN_AGAIN;
}
} else {
/* the task is done so we won't come back here */
//std::cout << "device_static_submit task " << task << " complete" << std::endl;
discard_tmp_flows();
}
return rc;
}
Expand Down

0 comments on commit ea6f47b

Please sign in to comment.