Skip to content

Commit

Permalink
feature: centralize databases for improved consistency
Browse files Browse the repository at this point in the history
fix: resolve GPU-related bugs when launching scenarios
  • Loading branch information
FerTV committed Dec 17, 2024
1 parent aebcbbb commit 6fee315
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 51 deletions.
8 changes: 8 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@
help="Config directory path",
)

argparser.add_argument(
"-d",
"--database",
dest="databases",
default="/opt/nebula",
help="Nebula databases path",
)

argparser.add_argument(
"-l",
"--logs",
Expand Down
2 changes: 1 addition & 1 deletion nebula/addons/waf/Dockerfile-waf
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ RUN cp nginx-modules/nginx-1.24.0/objs/ngx_http_geoip2_module.so /usr/lib/nginx/
RUN cp nginx-modules/nginx-1.24.0/objs/ngx_stream_geoip2_module.so /usr/lib/nginx/modules

# geoip2 database downloaded
RUN wget https://git.io/GeoLite2-Country.mmdb -P /usr/share/GeoIP/
RUN wget https://github.com/P3TERX/GeoLite.mmdb/raw/download/GeoLite2-Country.mmdb -P /usr/share/GeoIP/

# nginx configuration files
COPY default.conf /etc/nginx/templates/conf.d/default.conf.template
Expand Down
2 changes: 1 addition & 1 deletion nebula/addons/waf/default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ server {
listen 80 default_server;

server_name localhost;
set $upstream http://nebula-nebula-frontend; # Change this
set $upstream http://nebula_nebula-frontend; # Change this
set $always_redirect off;
modsecurity on;
location /nebula {
Expand Down
24 changes: 14 additions & 10 deletions nebula/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ def __init__(self, args):
self.statistics_port = args.statsport if hasattr(args, "statsport") else 8080
self.simulation = args.simulation
self.config_dir = args.config
self.db_dir = args.databases if hasattr(args, "databases") else "/opt/nebula"
self.test = args.test if hasattr(args, "test") else False
self.log_dir = args.logs
self.cert_dir = args.certs
Expand Down Expand Up @@ -453,6 +454,7 @@ def start(self):
else:
self.run_frontend()
logging.info(f"NEBULA Frontend is running at http://localhost:{self.frontend_port}")
logging.info(f"NEBULA Databases created in {self.db_dir}")

# Watchdog for running additional scripts in the host machine (i.e. during the execution of a federation)
event_handler = NebulaEventHandler()
Expand Down Expand Up @@ -516,7 +518,7 @@ def run_controller_api(self):
)

def run_waf(self):
network_name = f"{os.environ['USER']}-nebula-net-base"
network_name = f"{os.environ['USER']}_nebula-net-base"
base = DockerUtils.create_docker_network(network_name)

client = docker.from_env()
Expand All @@ -537,7 +539,7 @@ def run_waf(self):

container_id_waf = client.api.create_container(
image="nebula-waf",
name=f"{os.environ['USER']}-nebula-waf",
name=f"{os.environ['USER']}_nebula-waf",
detach=True,
volumes=volumes_waf,
host_config=host_config_waf,
Expand Down Expand Up @@ -571,7 +573,7 @@ def run_waf(self):

container_id = client.api.create_container(
image="nebula-waf-grafana",
name=f"{os.environ['USER']}-nebula-waf-grafana",
name=f"{os.environ['USER']}_nebula-waf-grafana",
detach=True,
environment=environment,
host_config=host_config,
Expand All @@ -595,7 +597,7 @@ def run_waf(self):

container_id_loki = client.api.create_container(
image="nebula-waf-loki",
name=f"{os.environ['USER']}-nebula-waf-loki",
name=f"{os.environ['USER']}_nebula-waf-loki",
detach=True,
command=command,
host_config=host_config_loki,
Expand All @@ -619,7 +621,7 @@ def run_waf(self):

container_id_promtail = client.api.create_container(
image="nebula-waf-promtail",
name=f"{os.environ['USER']}-nebula-waf-promtail",
name=f"{os.environ['USER']}_nebula-waf-promtail",
detach=True,
volumes=volumes_promtail,
host_config=host_config_promtail,
Expand All @@ -646,7 +648,7 @@ def run_frontend(self):
except Exception:
logging.info("No GPU available for the frontend, nodes will be deploy in CPU mode")

network_name = f"{os.environ['USER']}-nebula-net-base"
network_name = f"{os.environ['USER']}_nebula-net-base"

# Create the Docker network
base = DockerUtils.create_docker_network(network_name)
Expand Down Expand Up @@ -681,6 +683,7 @@ def run_frontend(self):
f"{self.root_path}:/nebula",
"/var/run/docker.sock:/var/run/docker.sock",
f"{self.root_path}/nebula/frontend/config/nebula:/etc/nginx/sites-available/default",
f"{self.db_dir}/databases:/nebula/nebula/frontend/databases"
],
extra_hosts={"host.docker.internal": "host-gateway"},
port_bindings={80: self.frontend_port, 8080: self.statistics_port},
Expand All @@ -692,7 +695,7 @@ def run_frontend(self):

container_id = client.api.create_container(
image="nebula-frontend",
name=f"{os.environ['USER']}-nebula-frontend",
name=f"{os.environ['USER']}_nebula-frontend",
detach=True,
environment=environment,
volumes=volumes,
Expand All @@ -708,16 +711,17 @@ def run_test(self):

@staticmethod
def stop_waf():
DockerUtils.remove_containers_by_prefix(f"{os.environ['USER']}-nebula-waf")
DockerUtils.remove_containers_by_prefix(f"{os.environ['USER']}_nebula-waf")

@staticmethod
def stop():
logging.info("Closing NEBULA (exiting from components)... Please wait")
DockerUtils.remove_containers_by_prefix(f"{os.environ['USER']}")
DockerUtils.remove_containers_by_prefix(f"{os.environ['USER']}_nebula")
DockerUtils.remove_containers_by_prefix(f"nebula_")
ScenarioManagement.stop_blockchain()
ScenarioManagement.stop_participants()
Controller.stop_waf()
DockerUtils.remove_docker_networks_by_prefix(f"{os.environ['USER']}")
DockerUtils.remove_docker_networks_by_prefix(f"{os.environ['USER']}_")
controller_pid_file = os.path.join(os.path.dirname(__file__), "controller.pid")
try:
with open(controller_pid_file) as f:
Expand Down
4 changes: 2 additions & 2 deletions nebula/core/training/lightning.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def create_trainer(self):
self._trainer = Trainer(
callbacks=[ModelSummary(max_depth=1), NebulaProgressBar()],
max_epochs=self.epochs,
accelerator=self.config.participant["device_args"]["accelerator"],
accelerator="gpu",
devices=gpu_index,
logger=self._logger,
enable_checkpointing=False,
Expand All @@ -197,7 +197,7 @@ def create_trainer(self):
self._trainer = Trainer(
callbacks=[ModelSummary(max_depth=1), NebulaProgressBar()],
max_epochs=self.epochs,
accelerator=self.config.participant["device_args"]["accelerator"],
accelerator="cpu",
devices="auto",
logger=self._logger,
enable_checkpointing=False,
Expand Down
73 changes: 48 additions & 25 deletions nebula/frontend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,15 +472,15 @@ async def check_enough_resources():
resources = await get_host_resources()

mem_percent = resources.get("memory_percent")
gpu_memory_percent = resources.get("gpu_memory_percent", [])
# gpu_memory_percent = resources.get("gpu_memory_percent", [])

# if cpu_percent >= settings.resources_threshold or mem_percent >= settings.resources_threshold:
if mem_percent >= settings.resources_threshold:
return False
elif len(gpu_memory_percent) > 0:
for gpu_mem in gpu_memory_percent:
if gpu_mem >= settings.resources_threshold:
return False
# elif len(gpu_memory_percent) > 0:
# for gpu_mem in gpu_memory_percent:
# if gpu_mem >= settings.resources_threshold:
# return False

return True

Expand All @@ -493,17 +493,17 @@ async def monitor_resources(user):
if not enough_resources:
running_scenario = get_running_scenario(user)
if running_scenario:
# Wich card has big memory consumption
gpu = await get_least_memory_gpu()
# Stop scenario if is using the high memory gpu
# # Wich card has big memory consumption
# gpu = await get_least_memory_gpu()
# # Stop scenario if is using the high memory gpu
running_scenario_as_dict = dict(running_scenario)
if running_scenario_as_dict["gpu_id"] == gpu.get("available_gpu_index"):
scenario_name = running_scenario_as_dict["name"]
stop_scenario(scenario_name, user)
user_data.scenarios_list_length -= 1
user_data.finish_scenario_event.set()
scenario_name = running_scenario_as_dict["name"]
# if running_scenario_as_dict["gpu_id"] == gpu.get("available_gpu_index"):
stop_scenario(scenario_name, user)
user_data.scenarios_list_length -= 1
user_data.finish_scenario_event.set()

await asyncio.sleep(5)
await asyncio.sleep(20)



Expand Down Expand Up @@ -876,9 +876,9 @@ def stop_scenario(scenario_name, user):
from nebula.scenarios import ScenarioManagement

ScenarioManagement.stop_participants(scenario_name)
DockerUtils.remove_containers_by_prefix(f"{os.environ.get('NEBULA_CONTROLLER_NAME')}-{user}-participant")
DockerUtils.remove_containers_by_prefix(f"{os.environ.get('NEBULA_CONTROLLER_NAME')}_{user}-participant")
DockerUtils.remove_docker_network(
f"{(os.environ.get('NEBULA_CONTROLLER_NAME'))}-{str(user).lower()}-nebula-net-scenario"
f"{(os.environ.get('NEBULA_CONTROLLER_NAME'))}_{str(user).lower()}-nebula-net-scenario"
)
ScenarioManagement.stop_blockchain()
scenario_set_status_to_finished(scenario_name)
Expand Down Expand Up @@ -1227,19 +1227,42 @@ async def node_stopped(scenario_name: str, request: Request):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)


async def assign_available_gpu(scenario_data, role):
async def assign_available_gpu(scenario_data, role):
available_gpus = []

if scenario_data["accelerator"] == "cpu":
scenario_data["gpu_id"] = []
else:
response = await get_available_gpus()
available_gpus = response.get("available_gpus")
if len(available_gpus) > 0:
if role == "user":
scenario_data["gpu_id"] = [available_gpus.pop()]
elif role == "admin":
scenario_data["gpu_id"] = available_gpus
else:
scenario_data["gpu_id"] = []
# Obtain available system_gpus
available_system_gpus = response.get("available_gpus")
running_scenarios = get_running_scenario(get_all=True)
# Obtain currently used gpus
if running_scenarios:
running_gpus = []
for scenario in running_scenarios:
scenario_gpus = json.loads(scenario["gpu_id"])
for gpu in scenario_gpus:
if gpu not in running_gpus:
running_gpus.append(gpu)

# Obtain gpus that are not in use
for gpu in available_system_gpus:
if gpu not in running_gpus:
available_gpus.append(gpu)
else:
available_gpus = available_system_gpus

# Assign gpus based in user role
if len(available_gpus) > 0:
if role == "user":
scenario_data["gpu_id"] = [available_gpus.pop()]
elif role == "admin":
scenario_data["gpu_id"] = available_gpus
else:
scenario_data["gpu_id"] = []
else:
scenario_data["gpu_id"] = []

return scenario_data

Expand Down
24 changes: 17 additions & 7 deletions nebula/frontend/database.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import datetime
import logging
import sqlite3

import aiosqlite
Expand All @@ -23,10 +24,15 @@


async def setup_database(db_file_location):
async with aiosqlite.connect(db_file_location) as db:
for pragma in PRAGMA_SETTINGS:
await db.execute(pragma)
await db.commit()
try:
async with aiosqlite.connect(db_file_location) as db:
for pragma in PRAGMA_SETTINGS:
await db.execute(pragma)
await db.commit()
except PermissionError:
logging.info("No permission to create the databases. Change the default databases directory")
except Exception as e:
logging.exception(f"An error has ocurred during setup_database: {e}")


async def ensure_columns(conn, table_name, desired_columns):
Expand Down Expand Up @@ -545,7 +551,7 @@ def scenario_set_status_to_completed(scenario_name):
print(f"Database error: {e}")


def get_running_scenario(username=None):
def get_running_scenario(username=None, get_all=False):
with sqlite3.connect(scenario_db_file_location) as conn:
conn.row_factory = sqlite3.Row
c = conn.cursor()
Expand All @@ -556,11 +562,15 @@ def get_running_scenario(username=None):
WHERE (status = ? OR status = ?) AND username = ?;
"""
c.execute(command, ("running", "completed", username))

result = c.fetchone()
else:
command = "SELECT * FROM scenarios WHERE status = ? OR status = ?;"
c.execute(command, ("running", "completed"))

result = c.fetchone()
if get_all:
result = c.fetchall()
else:
result = c.fetchone()

return result

Expand Down
Empty file.
10 changes: 5 additions & 5 deletions nebula/scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def __init__(self, scenario, user=None):

# Assign the controller endpoint
if self.scenario.deployment == "docker":
self.controller = f"{os.environ.get('NEBULA_CONTROLLER_NAME')}-nebula-frontend"
self.controller = f"{os.environ.get('NEBULA_CONTROLLER_NAME')}_nebula-frontend"
else:
self.controller = f"127.0.0.1:{os.environ.get('NEBULA_FRONTEND_PORT')}"

Expand Down Expand Up @@ -646,7 +646,7 @@ def start_nodes_docker(self):
logging.info("Starting nodes using Docker Compose...")
logging.info(f"env path: {self.env_path}")

network_name = f"{os.environ.get('NEBULA_CONTROLLER_NAME')}-{str(self.user).lower()}-nebula-net-scenario"
network_name = f"{os.environ.get('NEBULA_CONTROLLER_NAME')}_{str(self.user).lower()}-nebula-net-scenario"

# Create the Docker network
base = DockerUtils.create_docker_network(network_name)
Expand All @@ -658,7 +658,7 @@ def start_nodes_docker(self):
container_ids = []
for idx, node in enumerate(self.config.participants):
image = "nebula-core"
name = f"{os.environ.get('NEBULA_CONTROLLER_NAME')}-{self.user}-participant{node['device_args']['idx']}"
name = f"{os.environ.get('NEBULA_CONTROLLER_NAME')}_{self.user}-participant{node['device_args']['idx']}"

if node["device_args"]["accelerator"] == "gpu":
environment = {"NVIDIA_DISABLE_REQUIRE": True}
Expand Down Expand Up @@ -691,15 +691,15 @@ def start_nodes_docker(self):
f"{network_name}": client.api.create_endpoint_config(
ipv4_address=f"{base}.{i}",
),
f"{os.environ.get('NEBULA_CONTROLLER_NAME')}-nebula-net-base": client.api.create_endpoint_config(),
f"{os.environ.get('NEBULA_CONTROLLER_NAME')}_nebula-net-base": client.api.create_endpoint_config(),
"chainnet": client.api.create_endpoint_config(),
})
else:
networking_config = client.api.create_networking_config({
f"{network_name}": client.api.create_endpoint_config(
ipv4_address=f"{base}.{i}",
),
f"{os.environ.get('NEBULA_CONTROLLER_NAME')}-nebula-net-base": client.api.create_endpoint_config(),
f"{os.environ.get('NEBULA_CONTROLLER_NAME')}_nebula-net-base": client.api.create_endpoint_config(),
})

node["tracking_args"]["log_dir"] = "/nebula/app/logs"
Expand Down

0 comments on commit 6fee315

Please sign in to comment.