Skip to content

coll/tuned dynamic file in json format #13104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 185 additions & 0 deletions contrib/coll_tuned_rulefile_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
#!/usr/bin/env python3

# Copyright (c) 2024-2025 Amazon.com, Inc. or its affiliates.
# All Rights reserved.
# $COPYRIGHT$
#
# Additional copyrights may follow

import re
import json
from collections import OrderedDict

coll_dict = {
'allgather' : 0,
'allgatherv' : 1,
'allreduce' : 2,
'alltoall' : 3,
'alltoallv' : 4,
'alltoallw' : 5,
'barrier' : 6,
'bcast' : 7,
'exscan' : 8,
'gather' : 9,
'gatherv' : 10,
'reduce' : 11,
'reducescatter' : 12,
'reducescatterblock' : 13,
'scan' : 14,
'scatter' : 15,
'scatterv' : 16,
'neighbor_allgather' : 17,
'neighbor_allgatherv' : 18,
'neighbor_alltoall' : 19,
'neighbor_alltoallv' : 20,
'neighbor_alltoallw' : 21 }
coll_dict_rev = { v:k for k,v in coll_dict.items() }

han_component_dict = {
"self" : 0,
"basic" : 1,
"libnbc" : 2,
"tuned" : 3,
"sm" : 4,
"adapt" : 5,
"han" : 6,
}

han_topo_level_dict = {
'intra_node' : 0,
'inter_node' : 1,
'global_communicator' : 2,
}


def strip_comments(line):
return re.sub(r"#.*","",line).strip()

class GenericOpenMPIRuleReader():
def __init__(self, fp, fname_for_prints=""):
self.fp = fp
# The 1-indexed line number which corresponds to the next byte of fp read.
self.jline = 1
self.line_start = 0
def get_next_line(self):
while True:
self.line_start = self.fp.tell()
line = self.fp.readline()
if not line: return None
self.jline += 1
if strip_comments(line):
return line

def isnext_digit(self):
# ompi_coll_base_file_peek_next_char_isdigit
tell = self.fp.tell()
while True:
next = self.fp.read(1)
if next in ' \t':
tell += 1
continue
self.fp.seek(tell)
return next in '0123456789'

def get_next(self):
# (ompi_coll_base_file_getnext_long)
while True:
line = self.get_next_line()
if not line: return None
UNK = -1
jnum_start = UNK
jnum_end = UNK
for jc in range(len(line)):
if line[jc] in "#":
break
if line[jc] in '0123456789':
if jnum_start == UNK:
jnum_start = jc
jnum_end = jc
else:
if jnum_end != UNK:
break
if jnum_end != UNK:
self.fp.seek(self.line_start+jnum_end+1)
# decrement the line number, the next read will continue on this line.
self.jline -= 1
return int(line[jnum_start:jnum_end+1])

def read_header(self):
line = self.get_next_line()
match = re.match("rule-file-version-([0-9])", line)
if match:
return int(match.group(1))
else:
self.jline -= 1
self.fp.seek(self.line_start)
return 1

class TunedRuleReader(GenericOpenMPIRuleReader):
def load_rulefile(self):
json_root = OrderedDict()
file_ver = self.read_header()
json_root['rule_file_version'] = 3
json_root['module'] = 'tuned'
json_root['collectives'] = OrderedDict()

ncollectives = self.get_next()
for jcol in range(ncollectives):
coll_id = self.get_next()
coll_name = coll_dict_rev[coll_id]
comm_rules = []
ncomm_sizes = self.get_next()
for jcomm_size in range(ncomm_sizes):
comm_size = self.get_next()
nmsg_sizes = self.get_next()
comm_rule = OrderedDict()
comm_rule['comm_size_min'] = 0
if jcomm_size+1 < ncomm_sizes:
comm_rule['comm_size_max'] = max(comm_size-1, 0)
if jcomm_size > 0:
comm_rule['comm_size_min'] = comm_rules[jcomm_size-1]['comm_size_max'] + 1
msg_rules = []
for jmsg in range(nmsg_sizes):
msg_size = self.get_next()
result_alg = self.get_next()
result_topo_faninout = self.get_next()
result_segsize = self.get_next()
rule = OrderedDict()
rule['msg_size_min'] = msg_size
if jmsg < nmsg_sizes - 1:
rule['msg_size_max'] = 'Inf'
if jmsg > 0:
msg_rules[jmsg-1]['msg_size_max'] = msg_size - 1
rule['alg'] = result_alg
if result_topo_faninout != 0:
rule['faninout'] = result_topo_faninout
if result_segsize != 0:
rule['segsize'] = result_segsize
result_maxreq = 0
if file_ver > 1 and self.isnext_digit():
result_maxreq = self.get_next()
if result_maxreq != 0:
rule['reqs'] = result_maxreq
msg_rules.append(rule)
comm_rule['rules'] = msg_rules
comm_rules.append(comm_rule)
json_root['collectives'][coll_name] = comm_rules
return json_root

class TunedRuleWriter():
def __init__(self):
pass
def to_file(json_rules):
for coll in coll_dict.keys():
if coll in json_rules['collectives']:
pass

if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--input","-i", type=argparse.FileType('r'), required=True)
# parser.add_argument("--output","-o",type=argparse.FileType('w'), required=True)

args = parser.parse_args()
reader = TunedRuleReader(args.input)
print(json.dumps(reader.load_rulefile(), indent=4))
67 changes: 65 additions & 2 deletions docs/tuning-apps/coll-tuned.rst
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,77 @@ after.
.. code-block:: sh

shell$ mpirun ... --mca coll_tuned_use_dynamic_rules 1 \
--mca coll_tuned_dynamic_rules_filename /path/to/my_rules.conf ...
--mca coll_tuned_dynamic_rules_filename /path/to/my_rules.json ...

The loaded set of rules then are used to select the algorithm
to use based on the collective, the communicator size, and the message size.
Collectives for which rules have not be specified in the file will make use of
the *fixed decision* rules as usual.

Dynamic tuning files are organized in this format:
Starting with Open MPI 6.0, dynamic tuning files can be specified in JSON
format, although the classic format will still be accepted. A converter script
is also available to transfer classic format files into JSON.

The JSON format can be checked using the schema in
`docs/tuning-apps/tuned_dynamic_file_schema.json`. If your editor supports it,
this schema may provide validation of your file along with helpful tooltips for
each variable.

An example file is shown here:

.. code-block:: json

{
"$schema": "tuned_schema.json",
"rule_file_version" : 3,
"module" : "tuned",
"collectives" : {
"allreduce" :
[
{
"comm_size_min" : 64,
"comm_size_max" : 128,
"rules" : [
{
"msg_size_min" : 512,
"msg_size_max" : 511999,
"alg" : 2,
},
{
"msg_size_min" : 512000,
"msg_size_max" : "inf",
"alg" : "recursive_doubling",
"reqs" : 8
}
]
}
]
}
}

In this toy example the MPI_Allreduce collective (indicated by the `allreduce`
field) has two algorithms that will only be used on communicators with between
64 and 128 ranks. Additionally, those rules only apply to certain message
sizes. All others communicator sizes or message sizes fall back to the default
set of rules, and collectives other than MPI_Allreduce are not affected.

Unlike in the classic file format, there is no need to specify a default rule or
specify rules in increasing order. Overlapping message sizes or communicator
sizes are allowed, and won't emit warnings.

The process for selecting the matching rule is a simple first-match principle.
During communicator creation, the first set of communicator-rules which
satisfies the requirements (`comm_size_min`/`comm_size_max`) is selected. Then,
during each collective call, the message size is used to find the first matching
entry in the "rules" list.

The algorithm selected is indicated by the `alg` field. It may be either an
integer mapping to the classic file format, or a string. In both cases, the
value is checked against the appropriate coll_tuned_<collectived>_algorithm MCA
parameter, and un-recognized values will cause the rule to be ignored.


Classic file format:

.. code-block:: sh
:linenos:
Expand Down
130 changes: 130 additions & 0 deletions docs/tuning-apps/tuned_dynamic_file_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
{
"$schema": "https://json-schema.org/draft/2019-09/schema#",
"title": "OpenMPITunedRules",
"description": "Defines configuration for the Open MPI Tuned module to select which collective algorithms will be used depending on comm size, message size, etc.",
"type": "object",
"required": ["rule_file_version","module","collectives"],
"additionalProperties" : false,
"properties": {
"rule_file_version": {
"description": "The version of this configuration file",
"type": "number"
},
"module": {
"description": "The collective module intended to use these rules (tuned)",
"type": "string"
},
"$schema": {
"description": "The schema used for validation",
"type": "string"
},
"collectives" : {
"description": "The collectives, each with their own rules. Each collective is indicated by a lowercase property such as \"allgather\"",
"type": "object",
"additionalProperties" : false,
"patternProperties": {
"^(allgather|allreduce|alltoall|alltoallv|alltoallw|barrier)$": {
"type" : "array",
"items": { "$ref" : "#/$defs/comm_size_rule" }
},
"^(bcast|exscan|gather|gatherv|reduce|reducescatter|reducescatterblock)$": {
"type" : "array",
"items": { "$ref" : "#/$defs/comm_size_rule" }
},
"^(scan|scatter|scatterv|neighbor_allgather|neighbor_allgatherv)$": {
"type" : "array",
"items": { "$ref" : "#/$defs/comm_size_rule" }
},
"^(neighbor_alltoall|neighbor_alltoallv|neighbor_alltoallw)$": {
"type" : "array",
"items": { "$ref" : "#/$defs/comm_size_rule" }
}
}
}
},

"$defs": {
"msg_size_rule": {
"type": "object",
"required": ["alg"],
"additionalProperties" : false,
"properties" : {
"msg_size_min" : {
"description" : "The smallest message size in bytes this rule applies to",
"anyOf" : { "$ref" : "#/$defs/int_or_inf" }
},
"msg_size_max" : {
"description" : "The largest message size (inclusive) in bytes this rule applies to",
"anyOf" : { "$ref" : "#/$defs/int_or_inf" }
},
"alg" : {
"description" : "The algorithm to use for this collective. Integer or name, see coll_tuned_<collective>_algorithm for options.",
"type" : [ "string", "integer"]
},
"reqs" : {
"description" : "Algorithm parameter: Use this many requests. Some algorithms may ignore this option.",
"type" : [ "integer"]
},
"faninout" : {
"description" : "Algorithm parameter: Fan in and/or out by this much. Some algorithms may ignore this option.",
"type" : [ "integer"]
}
}
},

"comm_size_rule": {
"type": "object",
"required": ["rules"],
"additionalProperties" : false,
"properties" : {
"comm_size_min" : {
"description" : "The smallest size communicator these rules apply to",
"anyOf" : { "$ref" : "#/$defs/int_or_inf" }
},
"comm_size_max" : {
"description" : "The largest (inclusive) size communicator these rules apply to",
"anyOf" : { "$ref" : "#/$defs/int_or_inf" }
},
"comm_rank_distribution" : {
"description" : "A description of how the ranks are distributed within the communicator",
"enum" : ["any", "one-per-node", "single-node"]
},

"rules" : {
"description" : "A list of rules. The first matching rule is selected. If no match is found, defaults are used.",
"type" : "array",
"items": { "$ref" : "#/$defs/msg_size_rule" }
}
}
},
"collective_identifier": {
"enum" : [
"allgather",
"allreduce",
"alltoall",
"alltoallv",
"alltoallw",
"barrier",
"bcast",
"exscan",
"gather",
"gatherv",
"reduce",
"reducescatter",
"reducescatterblock",
"scan",
"scatter",
"scatterv",
"neighbor_allgather",
"neighbor_allgatherv",
"neighbor_alltoall",
"neighbor_alltoallv",
"neighbor_alltoallw"
]
},
"int_or_inf": [
{ "type" : "integer" },
{ "enum": ["inf","INF","Inf"] }
]
}
}
Loading