Skip to content

Commit

Permalink
Merge pull request #2237 from FedML-AI/charlie/dev/v0.7.0
Browse files Browse the repository at this point in the history
Charlie/dev/v0.7.0
  • Loading branch information
charlieyl authored Dec 17, 2024
2 parents 56f6059 + 9fa8499 commit 2055d68
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 9 deletions.
8 changes: 6 additions & 2 deletions python/fedml/computing/scheduler/comm_utils/hardware_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ def __get_util(cls) -> Optional[GPUCardUtil]:
@staticmethod
def get_gpus() -> List[GPUCard]:
gpu_util = HardwareUtil.__get_util()
return gpu_util.get_gpu_cards() if gpu_util is not None else []
cards = gpu_util.get_gpu_cards() if gpu_util is not None else []
logging.info(f"hardware_utils Available GPU cards len ---> { len(cards)}")
return cards

@staticmethod
def get_available_gpu_ids(order: str = "memory", limit: int = 1, max_load: float = 0.01,
max_memory: float = 0.01) -> List[int]:
gpu_util = HardwareUtil.__get_util()
return gpu_util.get_available_gpu_card_ids(order, limit, max_load, max_memory) if gpu_util is not None else []
card_ids = gpu_util.get_available_gpu_card_ids(order, limit, max_load, max_memory) if gpu_util is not None else []
logging.info(f"hardware_utils get_available_gpu_ids ids ---> {card_ids}, limit ---> {limit}")
return card_ids

@staticmethod
def get_docker_gpu_device_mapping(gpu_ids: Optional[List[int]], num_gpus: int = 0) -> Optional[Dict]:
Expand Down
37 changes: 30 additions & 7 deletions python/fedml/computing/scheduler/comm_utils/job_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,37 @@ def occupy_gpu_ids(self, run_id, request_gpu_num, device_id, inner_id=None,
# Get the available GPU list, FEDML_GLOBAL_DEVICE_AVAILABLE_GPU_IDS_TAG-${device_id}
available_gpu_ids = ComputeCacheManager.get_instance().get_gpu_cache().get_device_available_gpu_ids(
device_id)
logging.info(
f"Available GPU Ids fetched from cache: {available_gpu_ids}")
logging.info(f"Available GPU Ids fetched from cache: {available_gpu_ids}")

logging.info(f"Check worker({device_id})'s realtime gpu availability in DB"
f" for run {run_id}: {available_gpu_ids}")

# Get realtime GPU availability list from the system
realtime_available_gpu_ids = JobRunnerUtils.get_realtime_gpu_available_ids().copy()
logging.info(f"Cache not set yet, fetching realtime available GPU Ids: {realtime_available_gpu_ids}")

# If the available GPU list is not in the cache, set it to the current system available GPU list
if available_gpu_ids is None or available_gpu_ids == []:
if available_gpu_ids is None:
# Get realtime GPU availability list from the system
available_gpu_ids = JobRunnerUtils.get_realtime_gpu_available_ids().copy()
logging.info(f"Cache not set yet, fetching realtime available GPU Ids: {available_gpu_ids}")
available_gpu_ids = realtime_available_gpu_ids
else:
available_gpu_ids = JobRunnerUtils.trim_unavailable_gpu_ids(available_gpu_ids)
logging.info(
f"Trimmed available GPU Ids: {available_gpu_ids}")
logging.info(f"Trimmed available GPU Ids: {available_gpu_ids}")

initial_available_gpu_ids = ComputeCacheManager.get_instance().get_gpu_cache().get_device_initial_available_gpu_ids(
device_id)
# calculate the difference between realtime_available_gpu_ids and initial_available_gpu_ids
# if the difference is not empty, then add to available gpu ids
diff_gpu_ids = list(set(realtime_available_gpu_ids) - set(initial_available_gpu_ids))
if diff_gpu_ids:
available_gpu_ids.extend(diff_gpu_ids)
available_gpu_ids = list(set(available_gpu_ids))
available_gpu_ids.sort()
logging.info(f"Device {device_id} available GPU ids is changed because of the system gpu resource change, "
f"initial available gpu ids: {initial_available_gpu_ids}, "
f"realtime available gpu ids: {realtime_available_gpu_ids}, "
f"diff gpu ids: {diff_gpu_ids}, "
f"new available gpu ids: {available_gpu_ids}")

# Get the matched gpu ids string by the request gpu num
cuda_visible_gpu_ids_str, matched_gpu_num = JobRunnerUtils.request_gpu_ids(request_gpu_num,
Expand Down Expand Up @@ -321,6 +337,9 @@ def get_available_gpu_id_list(device_id):
# Get realtime GPU availability list from the system
gpu_ids = JobRunnerUtils.get_realtime_gpu_available_ids().copy()
ComputeCacheManager.get_instance().get_gpu_cache().set_device_available_gpu_ids(device_id, gpu_ids)
# Set the initial available GPU ids to the cache, use to check if the device all available GPU ids is changed because of the system resource change
ComputeCacheManager.get_instance().get_gpu_cache().set_device_initial_available_gpu_ids(device_id, gpu_ids)
logging.info(f"Set device {device_id} initial available GPU ids: {gpu_ids}")
available_gpu_ids = gpu_ids
return available_gpu_ids

Expand All @@ -339,6 +358,9 @@ def reset_available_gpu_id_list(device_id):
current_available_gpu_ids = JobRunnerUtils.get_realtime_gpu_available_ids().copy()
ComputeCacheManager.get_instance().get_gpu_cache().set_device_available_gpu_ids(device_id,
current_available_gpu_ids)
# Set the initial available GPU ids to the cache, use to check if the device all available GPU ids is changed because of the system resource change
ComputeCacheManager.get_instance().get_gpu_cache().set_device_initial_available_gpu_ids(device_id, current_available_gpu_ids)

gpu_list = sys_utils.get_gpu_list()
ComputeCacheManager.get_instance().get_gpu_cache().set_device_total_num_gpus(device_id, len(gpu_list))
except Exception as e:
Expand All @@ -351,6 +373,7 @@ def get_realtime_gpu_available_ids():
gpu_list = sys_utils.get_gpu_list()
gpu_count = len(gpu_list)
realtime_available_gpu_ids = sys_utils.get_available_gpu_id_list(limit=gpu_count)
logging.info(f"get_available_gpu_id_list limit:{gpu_count}, available_gpu_ids:{realtime_available_gpu_ids}")
return realtime_available_gpu_ids

@staticmethod
Expand Down
1 change: 1 addition & 0 deletions python/fedml/computing/scheduler/comm_utils/sys_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def get_available_gpu_id_list(limit=1) -> List[int]:

gpu_available_list = HardwareUtil.get_available_gpu_ids(order='memory', limit=limit, max_load=0.01,
max_memory=0.01)
logging.info(f"GPU available ids from HardwareUtil.get_available_gpu_ids, limit --> {limit}")
return gpu_available_list


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ def parse_resource_related_config(config, gpu_num_frm_platform=0):
num_gpus = 0

shm_size = config.get('shm_size', None)
# set shm_size to 8G if not specified
if not shm_size:
shm_size = "8G"

storage_opt = config.get('storage_opt', None)
tmpfs = config.get('tmpfs', None)
cpus = config.get('cpus', None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class ComputeGpuCache(object):
FEDML_GLOBAL_DEVICE_RUN_NUM_GPUS_TAG = "FEDML_GLOBAL_DEVICE_RUN_NUM_GPUS_TAG-"
FEDML_GLOBAL_DEVICE_RUN_GPU_IDS_TAG = "FEDML_GLOBAL_DEVICE_RUN_GPU_IDS_TAG-"
FEDML_GLOBAL_DEVICE_AVAILABLE_GPU_IDS_TAG = "FEDML_GLOBAL_DEVICE_AVAILABLE_GPU_IDS_TAG-"
FEDML_GLOBAL_DEVICE_INITIAL_AVAILABLE_GPU_IDS_TAG = "FEDML_GLOBAL_DEVICE_INITIAL_AVAILABLE_GPU_IDS_TAG-"
FEDML_GLOBAL_DEVICE_TOTAL_NUM_GPUS_TAG = "FEDML_GLOBAL_DEVICE_TOTAL_NUM_GPUS_TAG-"
FEDML_GLOBAL_RUN_TOTAL_NUM_GPUS_TAG = "FEDML_GLOBAL_RUN_TOTAL_NUM_GPUS_TAG-"
FEDML_GLOBAL_RUN_DEVICE_IDS_TAG = "FEDML_GLOBAL_RUN_DEVICE_IDS_TAG-"
Expand Down Expand Up @@ -107,6 +108,25 @@ def get_device_available_gpu_ids(self, device_id):
return []

return device_available_gpu_ids

def get_device_initial_available_gpu_ids(self, device_id):
# Get the initial available GPU ids from the cache, for checking if the device all available GPU ids is changed
device_initial_available_gpu_ids = None
try:
if self.redis_connection.exists(self.get_device_initial_available_gpu_ids_key(device_id)):
device_initial_available_gpu_ids = self.redis_connection.get(self.get_device_initial_available_gpu_ids_key(device_id))
if str(device_initial_available_gpu_ids).strip() == "":
return []
except Exception as e:
pass

if device_initial_available_gpu_ids is not None and str(device_initial_available_gpu_ids).strip() != "":
device_initial_available_gpu_ids = device_initial_available_gpu_ids.split(',')
device_initial_available_gpu_ids = self.map_str_list_to_int_list(device_initial_available_gpu_ids)
else:
return []

return device_initial_available_gpu_ids

def get_device_total_num_gpus(self, device_id):
device_total_num_gpus = None
Expand Down Expand Up @@ -241,6 +261,14 @@ def set_device_available_gpu_ids(self, device_id, gpu_ids):
pass

ComputeGpuDatabase.get_instance().set_device_available_gpu_ids(device_id, gpu_ids)

def set_device_initial_available_gpu_ids(self, device_id, gpu_ids):
# Set the initial available GPU ids to the cache, use to check if the device all available GPU ids is changed
try:
str_gpu_ids = self.map_list_to_str(gpu_ids)
self.redis_connection.set(self.get_device_initial_available_gpu_ids_key(device_id), str_gpu_ids)
except Exception as e:
pass

def set_device_total_num_gpus(self, device_id, num_gpus):
try:
Expand Down Expand Up @@ -311,6 +339,9 @@ def get_device_run_gpu_ids_key(device_id, run_id):

def get_device_available_gpu_ids_key(self, device_id):
return f"{ComputeGpuCache.FEDML_GLOBAL_DEVICE_AVAILABLE_GPU_IDS_TAG}{device_id}"

def get_device_initial_available_gpu_ids_key(self, device_id):
return f"{ComputeGpuCache.FEDML_GLOBAL_DEVICE_INITIAL_AVAILABLE_GPU_IDS_TAG}{device_id}"

def get_device_total_num_gpus_key(self, device_id):
return f"{ComputeGpuCache.FEDML_GLOBAL_DEVICE_TOTAL_NUM_GPUS_TAG}{device_id}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,13 @@ def callback_report_device_info(self, topic, payload):
else:
total_mem, free_mem, total_disk_size, free_disk_size, cup_utilization, cpu_cores, gpu_cores_total, \
gpu_cores_available, sent_bytes, recv_bytes, gpu_available_ids = sys_utils.get_sys_realtime_stats()
logging.info(f"GPU available ids from get_sys_realtime_stats --> {gpu_available_ids}")
host_ip = sys_utils.get_host_ip()
host_port = sys_utils.get_available_port()
gpu_available_ids = JobRunnerUtils.get_available_gpu_id_list(self.edge_id)
logging.info(f"GPU available ids from get_available_gpu_id_list(device_id) --> {gpu_available_ids}")
gpu_available_ids = JobRunnerUtils.trim_unavailable_gpu_ids(gpu_available_ids)
logging.info(f"GPU available ids from trim_unavailable_gpu_ids --> {gpu_available_ids}")
gpu_cores_available = len(gpu_available_ids)
gpu_list = sys_utils.get_gpu_list()
device_info_json = {
Expand Down Expand Up @@ -379,6 +382,8 @@ def callback_report_device_info(self, topic, payload):
"edge_info": device_info_json}
if context is not None:
response_payload["context"] = context

logging.info(f"Response payload --> {response_payload}")
self.message_center.send_message(response_topic, json.dumps(response_payload), run_id=run_id)

def callback_request_device_info_from_mlops(self, topic, payload):
Expand Down

0 comments on commit 2055d68

Please sign in to comment.