Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CL/HIER: bcast 2step algorithm #620

Merged
merged 8 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/clang-tidy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- name: Build UCC
run: |
./autogen.sh
CC=clang-${CLANG_VER} CXX=clang++-${CLANG_VER} ./configure --prefix=/tmp/ucc/install --with-ucx=/tmp/ucx/install
CC=clang-${CLANG_VER} CXX=clang++-${CLANG_VER} ./configure --prefix=/tmp/ucc/install --with-ucx=/tmp/ucx/install --enable-assert
bear --cdb /tmp/compile_commands.json make
- name: Run clang-tidy
run: |
Expand Down
8 changes: 7 additions & 1 deletion src/components/cl/hier/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ barrier = \
barrier/barrier.h \
barrier/barrier.c

bcast = \
bcast/bcast.h \
bcast/bcast.c \
bcast/bcast_2step.c

sources = \
cl_hier.h \
cl_hier.c \
Expand All @@ -31,7 +36,8 @@ sources = \
$(allreduce) \
$(alltoallv) \
$(alltoall) \
$(barrier)
$(barrier) \
$(bcast)

module_LTLIBRARIES = libucc_cl_hier.la
libucc_cl_hier_la_SOURCES = $(sources)
Expand Down
17 changes: 17 additions & 0 deletions src/components/cl/hier/bcast/bcast.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/**
* Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/

#include "bcast.h"
#include "../bcast/bcast.h"

ucc_base_coll_alg_info_t
ucc_cl_hier_bcast_algs[UCC_CL_HIER_BCAST_ALG_LAST + 1] = {
[UCC_CL_HIER_BCAST_ALG_2STEP] =
{.id = UCC_CL_HIER_BCAST_ALG_2STEP,
.name = "2step",
.desc = "intra-node and inter-node bcasts executed in parallel"},
[UCC_CL_HIER_BCAST_ALG_LAST] = {
.id = 0, .name = NULL, .desc = NULL}};
38 changes: 38 additions & 0 deletions src/components/cl/hier/bcast/bcast.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/

#ifndef BCAST_H_
#define BCAST_H_
#include "../cl_hier.h"

enum
{
UCC_CL_HIER_BCAST_ALG_2STEP,
UCC_CL_HIER_BCAST_ALG_LAST,
};

extern ucc_base_coll_alg_info_t
ucc_cl_hier_bcast_algs[UCC_CL_HIER_BCAST_ALG_LAST + 1];

#define UCC_CL_HIER_BCAST_DEFAULT_ALG_SELECT_STR "bcast:0-4k:@2step"

ucc_status_t ucc_cl_hier_bcast_2step_init(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *team,
ucc_coll_task_t **task);

static inline int ucc_cl_hier_bcast_alg_from_str(const char *str)
{
int i;

for (i = 0; i < UCC_CL_HIER_BCAST_ALG_LAST; i++) {
if (0 == strcasecmp(str, ucc_cl_hier_bcast_algs[i].name)) {
break;
}
}
return i;
}

#endif
263 changes: 263 additions & 0 deletions src/components/cl/hier/bcast/bcast_2step.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
/**
* Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*
* See file LICENSE for terms.
*/

#include "bcast.h"
#include "core/ucc_team.h"
#include "../cl_hier_coll.h"

static ucc_status_t ucc_cl_hier_bcast_2step_start(ucc_coll_task_t *task)
{
UCC_CL_HIER_PROFILE_REQUEST_EVENT(task, "cl_hier_bcast_2step_start", 0);
return ucc_schedule_start(task);
}

static ucc_status_t ucc_cl_hier_bcast_2step_finalize(ucc_coll_task_t *task)
{
ucc_schedule_t *schedule = ucc_derived_of(task, ucc_schedule_t);
ucc_status_t status;

UCC_CL_HIER_PROFILE_REQUEST_EVENT(task, "cl_hier_bcast_2step_finalize",
0);
status = ucc_schedule_finalize(task);
ucc_cl_hier_put_schedule(schedule);
return status;
}

static ucc_status_t
ucc_cl_hier_bcast_2step_schedule_finalize(ucc_coll_task_t *task)
{
ucc_cl_hier_schedule_t *schedule =
ucc_derived_of(task, ucc_cl_hier_schedule_t);
ucc_status_t status;

status = ucc_schedule_pipelined_finalize(&schedule->super.super.super);
ucc_cl_hier_put_schedule(&schedule->super.super);
return status;
}

static ucc_status_t
ucc_cl_hier_bcast_2step_frag_setup(ucc_schedule_pipelined_t *schedule_p,
ucc_schedule_t *frag, int frag_num)
{
ucc_coll_args_t *args = &schedule_p->super.super.bargs.args;
size_t dt_size = ucc_dt_size(args->src.info.datatype);
int n_frags = schedule_p->super.n_tasks;
size_t frag_count, frag_offset;
ucc_coll_task_t *task;
int i;

frag_count =
ucc_buffer_block_count(args->src.info.count, n_frags, frag_num);
frag_offset =
ucc_buffer_block_offset(args->src.info.count, n_frags, frag_num);

for (i = 0; i < frag->n_tasks; i++) {
task = frag->tasks[i];
task->bargs.args.src.info.count = frag_count;
task->bargs.args.src.info.buffer =
PTR_OFFSET(args->src.info.buffer, frag_offset * dt_size);
}
return UCC_OK;
}

static inline ucc_rank_t
find_root_net_rank(ucc_host_id_t root_host_id, ucc_cl_hier_team_t *cl_team)
{
ucc_sbgp_t *sbgp = cl_team->sbgps[UCC_HIER_SBGP_NODE_LEADERS].sbgp;
ucc_team_t *core_team = cl_team->super.super.params.team;
ucc_rank_t i, rank;

for (i = 0; i < sbgp->group_size; i++) {
rank = ucc_ep_map_eval(sbgp->map, i);
if (ucc_team_rank_host_id(rank, core_team) == root_host_id) {
return i;
}
}
return UCC_RANK_INVALID;
}

static inline ucc_rank_t
find_root_node_rank(ucc_rank_t root, ucc_cl_hier_team_t *cl_team)
{
ucc_sbgp_t *sbgp = cl_team->sbgps[UCC_HIER_SBGP_NODE].sbgp;
ucc_rank_t i;

for (i = 0; i < sbgp->group_size; i++) {
if (ucc_ep_map_eval(sbgp->map, i) == root) {
return i;
}
}
return UCC_RANK_INVALID;
}

static ucc_status_t
ucc_cl_hier_bcast_2step_init_schedule(ucc_base_coll_args_t *coll_args,
ucc_base_team_t *team,
ucc_schedule_t **sched_p, int n_frags)
{
ucc_cl_hier_team_t *cl_team = ucc_derived_of(team, ucc_cl_hier_team_t);
ucc_team_t *core_team = team->params.team;
vspetrov marked this conversation as resolved.
Show resolved Hide resolved
ucc_coll_task_t *tasks[2] = {NULL, NULL};
ucc_rank_t root = coll_args->args.root;
ucc_rank_t rank = UCC_TL_TEAM_RANK(cl_team);
int root_on_local_node = ucc_team_ranks_on_same_node(root, rank,
core_team);
ucc_base_coll_args_t args = *coll_args;
int n_tasks = 0;
int first_task = 0;
ucc_schedule_t *schedule;
ucc_status_t status;
int i;

schedule = &ucc_cl_hier_get_schedule(cl_team)->super.super;
if (ucc_unlikely(!schedule)) {
return UCC_ERR_NO_MEMORY;
}
status = ucc_schedule_init(schedule, &args, team);
if (ucc_unlikely(UCC_OK != status)) {
goto out;
}

if (n_frags > 1) {
args.max_frag_count =
ucc_buffer_block_count(args.args.src.info.count, n_frags, 0);
args.mask |= UCC_BASE_CARGS_MAX_FRAG_COUNT;
}

ucc_assert(SBGP_ENABLED(cl_team, NODE_LEADERS) ||
SBGP_ENABLED(cl_team, NODE));
if (SBGP_ENABLED(cl_team, NODE_LEADERS)) {
args.args.root = find_root_net_rank(
ucc_team_rank_host_id(root, core_team), cl_team);
status = ucc_coll_init(SCORE_MAP(cl_team, NODE_LEADERS), &args,
&tasks[n_tasks]);
if (ucc_unlikely(UCC_OK != status)) {
goto out;
}
n_tasks++;
if (root_on_local_node && (root != rank)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this condition is always false
if root != rank => root on remote node because we are inside if (SBGP_ENABLED(cl_team, NODE_LEADERS))
if root_on_local_node => root == rank because of the same reason

In that case you don't need first_task because first_task is always 0

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, i beleive you are right. I re-checked and tested.

first_task = 1;
}
}

if (SBGP_ENABLED(cl_team, NODE)) {
args.args.root = root_on_local_node
? find_root_node_rank(root, cl_team)
: core_team->topo->node_leader_rank_id;
status =
ucc_coll_init(SCORE_MAP(cl_team, NODE), &args, &tasks[n_tasks]);
if (ucc_unlikely(UCC_OK != status)) {
goto out;
}
n_tasks++;
}

ucc_task_subscribe_dep(&schedule->super, tasks[first_task],
UCC_EVENT_SCHEDULE_STARTED);
ucc_schedule_add_task(schedule, tasks[first_task]);
if (n_tasks > 1) {
if (root == rank) {
ucc_task_subscribe_dep(&schedule->super, tasks[(first_task + 1) % 2],
UCC_EVENT_SCHEDULE_STARTED);
} else {
ucc_task_subscribe_dep(tasks[first_task],
tasks[(first_task + 1) % 2], UCC_EVENT_COMPLETED);
}
ucc_schedule_add_task(schedule, tasks[(first_task + 1) % 2]);
}

schedule->super.post = ucc_cl_hier_bcast_2step_start;
schedule->super.progress = NULL;
schedule->super.finalize = ucc_cl_hier_bcast_2step_finalize;
schedule->super.triggered_post = ucc_triggered_post;
*sched_p = schedule;
return UCC_OK;

out:
for (i = 0; i < n_tasks; i++) {
tasks[i]->finalize(tasks[i]);
}
ucc_cl_hier_put_schedule(schedule);
return status;
}

static ucc_status_t ucc_cl_hier_bcast_2step_frag_init(
ucc_base_coll_args_t *coll_args, ucc_schedule_pipelined_t *sp,
ucc_base_team_t *team, ucc_schedule_t **frag_p)
{
int n_frags = sp->super.n_tasks;

return ucc_cl_hier_bcast_2step_init_schedule(coll_args, team, frag_p,
n_frags);
}

static ucc_status_t ucc_cl_hier_2step_bcast_start(ucc_coll_task_t *task)
{
ucc_schedule_pipelined_t *schedule =
ucc_derived_of(task, ucc_schedule_pipelined_t);

cl_debug(task->team->context->lib,
"posting 2step bcast, buf %p, count %zd, dt %s"
" pdepth %d, frags_total %d",
task->bargs.args.src.info.buffer,
task->bargs.args.src.info.count,
ucc_datatype_str(task->bargs.args.src.info.datatype),
schedule->n_frags, schedule->super.n_tasks);

return ucc_schedule_pipelined_post(task);
}

UCC_CL_HIER_PROFILE_FUNC(ucc_status_t, ucc_cl_hier_bcast_2step_init,
(coll_args, team, task),
ucc_base_coll_args_t *coll_args, ucc_base_team_t *team,
ucc_coll_task_t **task)
{
ucc_cl_hier_team_t *cl_team = ucc_derived_of(team, ucc_cl_hier_team_t);
ucc_cl_hier_lib_config_t *cfg = &UCC_CL_HIER_TEAM_LIB(cl_team)->cfg;
ucc_cl_hier_schedule_t * schedule;
int n_frags, pipeline_depth;
ucc_status_t status;

if (UCC_IS_PERSISTENT(coll_args->args)) {
return UCC_ERR_NOT_SUPPORTED;
}
ucc_pipeline_nfrags_pdepth(&cfg->bcast_2step_pipeline,
coll_args->args.src.info.count *
ucc_dt_size(coll_args->args.src.info.datatype),
&n_frags, &pipeline_depth);

if (n_frags == 1) {
return ucc_cl_hier_bcast_2step_init_schedule(
coll_args, team, (ucc_schedule_t **)task, n_frags);
}

schedule = ucc_cl_hier_get_schedule(cl_team);
vspetrov marked this conversation as resolved.
Show resolved Hide resolved
if (ucc_unlikely(!schedule)) {
return UCC_ERR_NO_MEMORY;
}

status = ucc_schedule_pipelined_init(
coll_args, team, ucc_cl_hier_bcast_2step_frag_init,
ucc_cl_hier_bcast_2step_frag_setup, pipeline_depth, n_frags,
cfg->bcast_2step_pipeline.order, &schedule->super);

if (ucc_unlikely(status != UCC_OK)) {
cl_error(team->context->lib,
"failed to init pipelined 2step bcast schedule");
goto err_pipe_init;
}

schedule->super.super.super.post = ucc_cl_hier_2step_bcast_start;
schedule->super.super.super.triggered_post = ucc_triggered_post;
schedule->super.super.super.finalize =
ucc_cl_hier_bcast_2step_schedule_finalize;
*task = &schedule->super.super.super;
return UCC_OK;

err_pipe_init:
ucc_cl_hier_put_schedule(&schedule->super.super);
return status;
}
7 changes: 7 additions & 0 deletions src/components/cl/hier/cl_hier.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ static ucc_config_field_t ucc_cl_hier_lib_config_table[] = {
ucc_offsetof(ucc_cl_hier_lib_config_t, allreduce_rab_pipeline),
UCC_CONFIG_TYPE_PIPELINE_PARAMS},

{"BCAST_2STEP_PIPELINE", "n",
"Pipelining settings for RAB bcast algorithm",
ucc_offsetof(ucc_cl_hier_lib_config_t, bcast_2step_pipeline),
UCC_CONFIG_TYPE_PIPELINE_PARAMS},

{NULL}};

static ucs_config_field_t ucc_cl_hier_context_config_table[] = {
Expand Down Expand Up @@ -102,4 +107,6 @@ __attribute__((constructor)) static void cl_hier_iface_init(void)
ucc_cl_hier_alltoall_algs;
ucc_cl_hier.super.alg_info[ucc_ilog2(UCC_COLL_TYPE_ALLTOALLV)] =
ucc_cl_hier_alltoallv_algs;
ucc_cl_hier.super.alg_info[ucc_ilog2(UCC_COLL_TYPE_BCAST)] =
ucc_cl_hier_bcast_algs;
}
2 changes: 1 addition & 1 deletion src/components/cl/hier/cl_hier.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ typedef struct ucc_cl_hier_lib_config {
size_t a2av_node_thresh;
ucc_pipeline_params_t allreduce_split_rail_pipeline;
ucc_pipeline_params_t allreduce_rab_pipeline;

ucc_pipeline_params_t bcast_2step_pipeline;
} ucc_cl_hier_lib_config_t;

typedef struct ucc_cl_hier_context_config {
Expand Down
Loading