Skip to content

Commit

Permalink
Merge pull request #2140 from FedML-AI/alaydshah/inference_gateway_lo…
Browse files Browse the repository at this point in the history
…gging

Inference Gateway Logs
  • Loading branch information
alaydshah authored Jun 6, 2024
2 parents 2c7d434 + fd446b0 commit f487b12
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 122 deletions.
42 changes: 19 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,44 +1,40 @@

# FEDML Open Source: A Unified and Scalable Machine Learning Library for Running Training and Deployment Anywhere at Any Scale

Backed by FEDML Nexus AI: Next-Gen Cloud Services for LLMs & Generative AI (https://fedml.ai)
Backed by TensorOpera AI: Your Generative AI Platform at Scale (https://TensorOpera.ai)

<div align="center">
<img src="docs/images/fedml_logo_light_mode.png" width="400px">
<img src="docs/images/TensorOpera_arch.png" width="600px">
</div>

FedML Documentation: https://doc.fedml.ai
TensorOpera Documentation: https://docs.TensorOpera.ai

FedML Homepage: https://fedml.ai/ \
FedML Blog: https://blog.fedml.ai/ \
FedML Medium: https://medium.com/@FedML \
FedML Research: https://fedml.ai/research-papers/
TensorOpera Homepage: https://TensorOpera.ai/ \
TensorOpera Blog: https://blog.TensorOpera.ai/

Join the Community: \
Join the Community:
Slack: https://join.slack.com/t/fedml/shared_invite/zt-havwx1ee-a1xfOUrATNfc9DFqU~r34w \
Discord: https://discord.gg/9xkW8ae6RV


FEDML® stands for Foundational Ecosystem Design for Machine Learning. [FEDML Nexus AI](https://fedml.ai) is the next-gen cloud service for LLMs & Generative AI. It helps developers to *launch* complex model *training*, *deployment*, and *federated learning* anywhere on decentralized GPUs, multi-clouds, edge servers, and smartphones, *easily, economically, and securely*.
TensorOpera® AI (https://TensorOpera.ai) is the next-gen cloud service for LLMs & Generative AI. It helps developers to launch complex model training, deployment, and federated learning anywhere on decentralized GPUs, multi-clouds, edge servers, and smartphones, easily, economically, and securely.

Highly integrated with [FEDML open source library](https://github.com/fedml-ai/fedml), FEDML Nexus AI provides holistic support of three interconnected AI infrastructure layers: user-friendly MLOps, a well-managed scheduler, and high-performance ML libraries for running any AI jobs across GPU Clouds.
Highly integrated with TensorOpera open source library, TensorOpera AI provides holistic support of three interconnected AI infrastructure layers: user-friendly MLOps, a well-managed scheduler, and high-performance ML libraries for running any AI jobs across GPU Clouds.

![fedml-nexus-ai-overview.png](./docs/images/fedml-nexus-ai-overview.png)
A typical workflow is showing in figure above. When developer wants to run a pre-built job in Studio or Job Store, TensorOpera®Launch swiftly pairs AI jobs with the most economical GPU resources, auto-provisions, and effortlessly runs the job, eliminating complex environment setup and management. When running the job, TensorOpera®Launch orchestrates the compute plane in different cluster topologies and configuration so that any complex AI jobs are enabled, regardless model training, deployment, or even federated learning. TensorOpera®Open Source is unified and scalable machine learning library for running these AI jobs anywhere at any scale.

A typical workflow is showing in figure above. When developer wants to run a pre-built job in Studio or Job Store, FEDML®Launch swiftly pairs AI jobs with the most economical GPU resources, auto-provisions, and effortlessly runs the job, eliminating complex environment setup and management. When running the job, FEDML®Launch orchestrates the compute plane in different cluster topologies and configuration so that any complex AI jobs are enabled, regardless model training, deployment, or even federated learning. FEDML®Open Source is unified and scalable machine learning library for running these AI jobs anywhere at any scale.
In the MLOps layer of TensorOpera AI
- **TensorOpera® Studio** embraces the power of Generative AI! Access popular open-source foundational models (e.g., LLMs), fine-tune them seamlessly with your specific data, and deploy them scalably and cost-effectively using the TensorOpera Launch on GPU marketplace.
- **TensorOpera® Job Store** maintains a list of pre-built jobs for training, deployment, and federated learning. Developers are encouraged to run directly with customize datasets or models on cheaper GPUs.

In the MLOps layer of FEDML Nexus AI
- **FEDML® Studio** embraces the power of Generative AI! Access popular open-source foundational models (e.g., LLMs), fine-tune them seamlessly with your specific data, and deploy them scalably and cost-effectively using the FEDML Launch on GPU marketplace.
- **FEDML® Job Store** maintains a list of pre-built jobs for training, deployment, and federated learning. Developers are encouraged to run directly with customize datasets or models on cheaper GPUs.
In the scheduler layer of TensorOpera AI
- **TensorOpera® Launch** swiftly pairs AI jobs with the most economical GPU resources, auto-provisions, and effortlessly runs the job, eliminating complex environment setup and management. It supports a range of compute-intensive jobs for generative AI and LLMs, such as large-scale training, serverless deployments, and vector DB searches. TensorOpera Launch also facilitates on-prem cluster management and deployment on private or hybrid clouds.

In the scheduler layer of FEDML Nexus AI
- **FEDML® Launch** swiftly pairs AI jobs with the most economical GPU resources, auto-provisions, and effortlessly runs the job, eliminating complex environment setup and management. It supports a range of compute-intensive jobs for generative AI and LLMs, such as large-scale training, serverless deployments, and vector DB searches. FEDML Launch also facilitates on-prem cluster management and deployment on private or hybrid clouds.

In the Compute layer of FEDML Nexus AI
- **FEDML® Deploy** is a model serving platform for high scalability and low latency.
- **FEDML® Train** focuses on distributed training of large and foundational models.
- **FEDML® Federate** is a federated learning platform backed by the most popular federated learning open-source library and the world’s first FLOps (federated learning Ops), offering on-device training on smartphones and cross-cloud GPU servers.
- **FEDML® Open Source** is unified and scalable machine learning library for running these AI jobs anywhere at any scale.
In the Compute layer of TensorOpera AI
- **TensorOpera® Deploy** is a model serving platform for high scalability and low latency.
- **TensorOpera® Train** focuses on distributed training of large and foundational models.
- **TensorOpera® Federate** is a federated learning platform backed by the most popular federated learning open-source library and the world’s first FLOps (federated learning Ops), offering on-device training on smartphones and cross-cloud GPU servers.
- **TensorOpera® Open Source** is unified and scalable machine learning library for running these AI jobs anywhere at any scale.

# Contributing
FedML embraces and thrive through open-source. We welcome all kinds of contributions from the community. Kudos to all of <a href="https://github.com/fedml-ai/fedml/graphs/contributors" target="_blank">our amazing contributors</a>!
Expand Down
Binary file added docs/images/TensorOpera_arch.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 5 additions & 4 deletions python/fedml/api/modules/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,15 @@ def _bind(
docker, docker_rank, infer_host,
redis_addr, redis_port, redis_password
):
fedml.load_env()
if os.getenv(ModuleConstants.ENV_FEDML_INFER_HOST) is None:
os.environ[ModuleConstants.ENV_FEDML_INFER_HOST] = infer_host
fedml.set_env_kv(ModuleConstants.ENV_FEDML_INFER_HOST, infer_host)
if os.getenv(ModuleConstants.ENV_FEDML_INFER_REDIS_ADDR) is None:
os.environ[ModuleConstants.ENV_FEDML_INFER_REDIS_ADDR] = redis_addr
fedml.set_env_kv(ModuleConstants.ENV_FEDML_INFER_REDIS_ADDR, redis_addr)
if os.getenv(ModuleConstants.ENV_FEDML_INFER_REDIS_PORT) is None:
os.environ[ModuleConstants.ENV_FEDML_INFER_REDIS_PORT] = redis_port
fedml.set_env_kv(ModuleConstants.ENV_FEDML_INFER_REDIS_PORT, redis_port)
if os.getenv(ModuleConstants.ENV_FEDML_INFER_REDIS_PASSWORD) is None:
os.environ[ModuleConstants.ENV_FEDML_INFER_REDIS_PASSWORD] = redis_password
fedml.set_env_kv(ModuleConstants.ENV_FEDML_INFER_REDIS_PASSWORD, redis_password)

url = fedml._get_backend_service()
platform_name = platform.system()
Expand Down
1 change: 1 addition & 0 deletions python/fedml/computing/scheduler/env/collect_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ def load_env():


def set_env_kv(key, value):
os.environ[key] = value
env_config_file = get_env_file()
dotenv.set_key(env_config_file, key, value)
load_env()
Original file line number Diff line number Diff line change
@@ -1,60 +1,50 @@
import argparse
import json
import logging
import time
import traceback
import os

from urllib.parse import urlparse
from typing import Any, Mapping, MutableMapping, Union
from urllib.parse import urlparse

from fastapi import FastAPI, Request, Response, status
from fastapi.responses import StreamingResponse, JSONResponse

import fedml
from fedml.api.modules.constants import ModuleConstants
from fedml.computing.scheduler.model_scheduler.device_client_constants import ClientConstants
from fedml.computing.scheduler.model_scheduler.device_http_inference_protocol import FedMLHttpInference
from fedml.computing.scheduler.model_scheduler.device_server_constants import ServerConstants
from fedml.computing.scheduler.model_scheduler.device_model_monitor import FedMLModelMetrics
from fedml.computing.scheduler.model_scheduler.device_model_cache import FedMLModelCache
from fedml.computing.scheduler.model_scheduler.device_mqtt_inference_protocol import FedMLMqttInference
from fedml.computing.scheduler.model_scheduler.device_http_proxy_inference_protocol import FedMLHttpProxyInference
from fedml.computing.scheduler.comm_utils import sys_utils

try:
from pydantic import BaseSettings
except Exception as e:
pass
try:
from pydantic_settings import BaseSettings
except Exception as e:
pass
from fedml.core.mlops.mlops_configs import MLOpsConfigs
from fedml.core.mlops import MLOpsRuntimeLog, MLOpsRuntimeLogDaemon


class Settings:
redis_addr = "127.0.0.1"
redis_port = 6379
redis_password = "fedml_default"
end_point_name = ""
model_name = ""
model_version = ""
model_infer_url = "127.0.0.1"
version = "dev"
use_mqtt_inference = False
use_worker_gateway = False
ext_info = "2b34303961245c4f175f2236282d7a272c040b0904747579087f6a760112030109010c215d54505707140005190a051c347f365c4a430c020a7d39120e26032a78730f797f7c031f0901657e75"
server_name = "DEVICE_INFERENCE_GATEWAY"
fedml.load_env()
redis_addr = os.getenv(ModuleConstants.ENV_FEDML_INFER_REDIS_ADDR)
redis_port = os.getenv(ModuleConstants.ENV_FEDML_INFER_REDIS_PORT)
redis_password = os.getenv(ModuleConstants.ENV_FEDML_INFER_REDIS_PASSWORD)
model_infer_host = os.getenv(ModuleConstants.ENV_FEDML_INFER_HOST)
version = fedml.get_env_version()
mqtt_config = MLOpsConfigs.fetch_mqtt_config()


api = FastAPI()

FEDML_MODEL_CACHE = FedMLModelCache.get_instance()
FEDML_MODEL_CACHE.set_redis_params(
redis_addr=Settings.redis_addr,
redis_port=Settings.redis_port,
redis_password=Settings.redis_password)
FEDML_MODEL_CACHE.set_redis_params(redis_addr=Settings.redis_addr,
redis_port=Settings.redis_port,
redis_password=Settings.redis_password)


@api.middleware("http")
async def auth_middleware(request: Request, call_next):

if "/inference" in request.url.path or "/api/v1/predict" in request.url.path:
try:
# Attempt to parse the JSON body.
Expand Down Expand Up @@ -94,6 +84,11 @@ async def auth_middleware(request: Request, call_next):
return response


@api.on_event("startup")
async def startup_event():
configure_logging()


@api.get('/')
async def root():
return {'message': 'TensorOpera Inference Service!'}
Expand Down Expand Up @@ -140,7 +135,7 @@ async def predict_openai(end_point_id, request: Request):
try:
response = await _predict(end_point_id, input_json, header)
except Exception as e:
response = {"error": True, "message": f"{traceback.format_exc()}"}
response = {"error": True, "message": f"{traceback.format_exc()}, exception {e}"}

return response

Expand Down Expand Up @@ -176,7 +171,6 @@ async def _predict(
input_json,
header=None
) -> Union[MutableMapping[str, Any], Response, StreamingResponse]:

# Always increase the pending requests counter on a new incoming request.
FEDML_MODEL_CACHE.update_pending_requests_counter(increase=True)
inference_response = {}
Expand Down Expand Up @@ -224,7 +218,7 @@ async def _predict(
# Start timing for model metrics
model_metrics = FedMLModelMetrics(end_point_id, in_end_point_name,
model_id, in_model_name, model_version,
Settings.model_infer_url,
Settings.model_infer_host,
Settings.redis_addr,
Settings.redis_port,
Settings.redis_password,
Expand Down Expand Up @@ -283,7 +277,8 @@ def retrieve_info_by_endpoint_id(end_point_id, in_end_point_name=None, in_model_
model_name = ""
if in_end_point_name is not None:
end_point_name = in_end_point_name
model_name = redis_key[len(f"{FedMLModelCache.FEDML_MODEL_DEPLOYMENT_STATUS_TAG}-{end_point_id}-{in_end_point_name}-"):]
model_name = redis_key[
len(f"{FedMLModelCache.FEDML_MODEL_DEPLOYMENT_STATUS_TAG}-{end_point_id}-{in_end_point_name}-"):]
else:
# e.g. FEDML_MODEL_DEPLOYMENT_STATUS--1234-dummy_endpoint_name-dummy_model_name
try:
Expand All @@ -308,7 +303,7 @@ def found_idle_inference_device(end_point_id, end_point_name, in_model_name, in_
inference_output_url = ""
model_version = ""
# Found idle device (TODO: optimize the algorithm to search best device for inference)
payload, idle_device = FEDML_MODEL_CACHE.\
payload, idle_device = FEDML_MODEL_CACHE. \
get_idle_device(end_point_id, end_point_name, in_model_name, in_model_version)
if payload is not None:
logging.info("found idle deployment result {}".format(payload))
Expand All @@ -328,7 +323,6 @@ def found_idle_inference_device(end_point_id, end_point_name, in_model_name, in_

async def send_inference_request(idle_device, end_point_id, inference_url, input_list, output_list,
inference_type="default", has_public_ip=True):

request_timeout_sec = FEDML_MODEL_CACHE.get_endpoint_settings(end_point_id) \
.get("request_timeout_sec", ClientConstants.INFERENCE_REQUEST_TIMEOUT)

Expand Down Expand Up @@ -367,16 +361,7 @@ async def send_inference_request(idle_device, end_point_id, inference_url, input
return inference_response

if not has_public_ip:
connect_str = "@FEDML@"
random_out = sys_utils.random2(Settings.ext_info, "FEDML@9999GREAT")
config_list = random_out.split(connect_str)
agent_config = dict()
agent_config["mqtt_config"] = dict()
agent_config["mqtt_config"]["BROKER_HOST"] = config_list[0]
agent_config["mqtt_config"]["BROKER_PORT"] = int(config_list[1])
agent_config["mqtt_config"]["MQTT_USER"] = config_list[2]
agent_config["mqtt_config"]["MQTT_PWD"] = config_list[3]
agent_config["mqtt_config"]["MQTT_KEEPALIVE"] = int(config_list[4])
agent_config = {"mqtt_config": Settings.mqtt_config}
mqtt_inference = FedMLMqttInference(
agent_config=agent_config,
run_id=end_point_id)
Expand Down Expand Up @@ -410,18 +395,16 @@ async def send_inference_request(idle_device, end_point_id, inference_url, input
def auth_request_token(end_point_id, end_point_name, model_name, token):
if token is None:
return False

cached_token = FEDML_MODEL_CACHE.\
cached_token = FEDML_MODEL_CACHE. \
get_end_point_token(end_point_id, end_point_name, model_name)
if cached_token is not None and str(cached_token) == str(token):
return True

return False


def is_endpoint_activated(end_point_id):
if end_point_id is None:
return False

activated = FEDML_MODEL_CACHE.get_end_point_activation(end_point_id)
return activated

Expand All @@ -432,13 +415,33 @@ def logging_inference_request(request, response):

try:
log_dir = ServerConstants.get_log_file_dir()
if not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)
inference_log_file = os.path.join(log_dir, "inference.log")
with open(inference_log_file, "a") as f:
f.writelines([f"request: {request}, response: {response}\n"])
except Exception as ex:
logging.info("failed to log inference request and response to file.")
logging.info(f"failed to log inference request and response to file with exception {ex}")


def configure_logging():
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
args = parser.parse_args([])

setattr(args, "log_file_dir", ServerConstants.get_log_file_dir())
setattr(args, "run_id", -1)
setattr(args, "role", "server")
setattr(args, "using_mlops", True)
setattr(args, "config_version", fedml.get_env_version())

runner_info = ServerConstants.get_runner_infos()
if not (runner_info and "edge_id" in runner_info):
raise Exception("Inference gateway couldn't be started as edge_id couldn't be parsed from runner_infos.yaml")
setattr(args, "edge_id", int(runner_info.get("edge_id")))

MLOpsRuntimeLog.get_instance(args).init_logs(log_level=logging.INFO)
MLOpsRuntimeLogDaemon.get_instance(args).start_log_processor(log_run_id=args.run_id, log_device_id=args.edge_id,
log_source=Settings.server_name,
log_file_prefix=Settings.server_name)
logging.info("start the log processor for inference gateway")


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import asyncio

from ..comm_utils.constants import SchedulerConstants
from ....core.distributed.communication.mqtt.mqtt_manager import MqttManager
from .device_http_inference_protocol import FedMLHttpInference

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import subprocess
import sys
from os.path import expanduser
from pathlib import Path

import psutil
import yaml
Expand Down Expand Up @@ -329,9 +330,23 @@ def save_bootstrap_process(run_id, process_id):
run_id, process_id, ServerConstants.get_data_dir(), ServerConstants.LOCAL_RUNNER_INFO_DIR_NAME,
info_file_prefix=SchedulerConstants.RUN_PROCESS_TYPE_BOOTSTRAP_PROCESS)

@staticmethod
def get_runner_infos():
local_pkg_data_dir = ServerConstants.get_data_dir()
os.makedirs(local_pkg_data_dir, exist_ok=True)
os.makedirs(os.path.join(local_pkg_data_dir, ServerConstants.LOCAL_RUNNER_INFO_DIR_NAME), exist_ok=True)

runner_info_file = os.path.join(local_pkg_data_dir, ServerConstants.LOCAL_RUNNER_INFO_DIR_NAME,
"runner_infos.yaml")
runner_info = {}
try:
runner_info = yaml.safe_load(Path(runner_info_file).read_text())
except Exception as e:
logging.error(f"Failed to parse runner info: {e}")
return runner_info

@staticmethod
def save_runner_infos(unique_device_id, edge_id, run_id=None):
home_dir = expanduser("~")
local_pkg_data_dir = ServerConstants.get_data_dir()
os.makedirs(local_pkg_data_dir, exist_ok=True)
os.makedirs(os.path.join(local_pkg_data_dir, ServerConstants.LOCAL_RUNNER_INFO_DIR_NAME), exist_ok=True)
Expand Down
Loading

0 comments on commit f487b12

Please sign in to comment.