-
Notifications
You must be signed in to change notification settings - Fork 283
Workflow executor example workflow API #1102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
727e9f3
87ed7db
bc5cc12
03f0d01
11344a6
00de5a8
7f8f957
14944f9
891324f
f8f3afd
62fd863
d251aa5
bfb0522
b784a68
77675ce
3745142
9b14045
07d4ed4
683654b
0098bf1
256dfbd
e4e8a70
0523b68
cbc87e7
8bb3f30
219cab2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
#!/bin/bash | ||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
set -e | ||
|
||
WORKPATH=$(dirname "$PWD") | ||
vllm_port=${vllm_port} | ||
[[ -z "$vllm_port" ]] && vllm_port=8084 | ||
export WORKDIR=$WORKPATH/../../ | ||
echo "WORKDIR=${WORKDIR}" | ||
export SDK_BASE_URL=$1 | ||
echo "SDK_BASE_URL=$1" | ||
export SERVING_TOKEN=${SERVING_TOKEN} | ||
export HF_TOKEN=${HUGGINGFACEHUB_API_TOKEN} | ||
export llm_engine=vllm | ||
export ip_address=$(hostname -I | awk '{print $1}') | ||
export llm_endpoint_url=http://${ip_address}:${vllm_port} | ||
export model=mistralai/Mistral-7B-Instruct-v0.3 | ||
export recursion_limit=25 | ||
export temperature=0 | ||
export max_new_tokens=1000 | ||
export TOOLSET_PATH=$WORKDIR/GenAIExamples/WorkflowExecAgent/tools/ | ||
|
||
function start_agent() { | ||
echo "Starting Agent services" | ||
cd $WORKDIR/GenAIExamples/WorkflowExecAgent/docker_compose/intel/cpu/xeon | ||
WORKDIR=$WORKPATH/docker_image_build/ docker compose -f compose_vllm.yaml up -d | ||
echo "Waiting agent service ready" | ||
sleep 5s | ||
} | ||
|
||
function main() { | ||
echo "==================== Start agent service ====================" | ||
start_agent | ||
echo "==================== Agent service started ====================" | ||
} | ||
|
||
main |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
#!/bin/bash | ||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
set -e | ||
|
||
wf_api_port=${wf_api_port} | ||
[[ -z "$wf_api_port" ]] && wf_api_port=5005 | ||
WORKPATH=$(dirname "$PWD") | ||
LOG_PATH="$WORKPATH/tests/example_workflow" | ||
export WORKDIR=$WORKPATH/../../ | ||
echo "WORKDIR=${WORKDIR}" | ||
|
||
function start_example_workflow_api() { | ||
echo "Starting example workflow API" | ||
cd $WORKDIR/GenAIExamples/WorkflowExecAgent/tests/example_workflow | ||
docker build -f Dockerfile.example_workflow_api -t example-workflow-service . | ||
docker run -d -p ${wf_api_port}:${wf_api_port} --rm --network=host --name example-workflow-service -it example-workflow-service | ||
echo "Waiting example workflow API ready" | ||
until [[ "$n" -ge 100 ]] || [[ $ready == true ]]; do | ||
docker logs example-workflow-service &> ${LOG_PATH}/example-workflow-service.log | ||
n=$((n+1)) | ||
if grep -q "Uvicorn running on" ${LOG_PATH}/example-workflow-service.log; then | ||
break | ||
fi | ||
if grep -q "No such container" ${LOG_PATH}/example-workflow-service.log; then | ||
echo "container example-workflow-service not found" | ||
exit 1 | ||
fi | ||
sleep 5s | ||
done | ||
} | ||
|
||
function main() { | ||
echo "==================== Start example workflow API ====================" | ||
start_example_workflow_api | ||
echo "==================== Example workflow API started ====================" | ||
} | ||
|
||
main |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
#!/bin/bash | ||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
set -e | ||
|
||
WORKPATH=$(dirname "$PWD") | ||
export WORKDIR=$WORKPATH/../../ | ||
echo "WORKDIR=${WORKDIR}" | ||
export ip_address=$(hostname -I | awk '{print $1}') | ||
query=$1 | ||
validate_result=$2 | ||
|
||
function validate() { | ||
local CONTENT="$1" | ||
local EXPECTED_RESULT="$2" | ||
local SERVICE_NAME="$3" | ||
|
||
if echo "$CONTENT" | grep -q "$EXPECTED_RESULT"; then | ||
echo "[ $SERVICE_NAME ] Content is as expected: $CONTENT" | ||
echo "[TEST INFO]: Workflow Executor agent service PASSED" | ||
else | ||
echo "[ $SERVICE_NAME ] Content does not match the expected result: $CONTENT" | ||
echo "[TEST INFO]: Workflow Executor agent service FAILED" | ||
fi | ||
} | ||
|
||
function validate_agent_service() { | ||
echo "----------------Test agent ----------------" | ||
local CONTENT=$(curl http://${ip_address}:9091/v1/chat/completions -X POST -H "Content-Type: application/json" -d '{ | ||
"messages": "'"${query}"'" | ||
}') | ||
validate "$CONTENT" "$validate_result" "workflowexec-agent-endpoint" | ||
docker logs workflowexec-agent-endpoint | ||
} | ||
|
||
function main() { | ||
echo "==================== Validate agent service ====================" | ||
validate_agent_service | ||
echo "==================== Agent service validated ====================" | ||
} | ||
|
||
main |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
FROM ubuntu:22.04 | ||
|
||
RUN apt-get -qq update | ||
|
||
WORKDIR /home/ubuntu | ||
|
||
COPY launch_workflow_service.sh requirements.txt main.py workflow.py ./ | ||
|
||
RUN chmod +x ./launch_workflow_service.sh | ||
|
||
CMD ["./launch_workflow_service.sh"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
#!/bin/bash | ||
|
||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
apt-get -qq -y install --no-install-recommends unzip curl ca-certificates | ||
apt-get -qq -y install --no-install-recommends python3 python3-pip | ||
|
||
curl -L -o ./archive.zip https://www.kaggle.com/api/v1/datasets/download/blastchar/telco-customer-churn | ||
unzip ./archive.zip -d ./ | ||
rm ./archive.zip | ||
|
||
pip install virtualenv && \ | ||
virtualenv venv && \ | ||
source venv/bin/activate && \ | ||
pip install -r requirements.txt | ||
|
||
uvicorn main:app --reload --port=5005 --host=0.0.0.0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
import logging | ||
|
||
from fastapi import APIRouter, FastAPI | ||
from workflow import run_workflow | ||
|
||
logger = logging.getLogger(__name__) | ||
logger.setLevel(logging.DEBUG) | ||
|
||
app = FastAPI() | ||
|
||
router = APIRouter(prefix="/serving", tags=["Workflow Serving"]) | ||
|
||
app.results = {} | ||
|
||
|
||
@router.post("/servable_workflows/{wf_id}/start", summary="Start Workflow") | ||
async def start_workflow(wf_id: int, params: dict): | ||
try: | ||
app.results = run_workflow(params["params"]) | ||
wf_key = "example_key" | ||
return {"msg": "ok", "wf_key": wf_key} | ||
|
||
except Exception as e: | ||
logging.error(e, exc_info=True) | ||
return {"msg": "error occurred"} | ||
|
||
|
||
@router.get("/serving_workflows/{wf_key}/status", summary="Get Workflow Status") | ||
async def get_status(wf_key: str): | ||
try: | ||
if app.results: | ||
status = "finished" | ||
else: | ||
status = "failed" | ||
|
||
return {"workflow_status": status} | ||
except Exception as e: | ||
logging.error(e) | ||
return {"msg": "error occurred"} | ||
|
||
|
||
@router.get("/serving_workflows/{wf_key}/results", summary="Get Workflow Results") | ||
async def get_results(wf_key: str): | ||
try: | ||
if app.results: | ||
return app.results | ||
else: | ||
return {"msg": "There is an issue while getting results !!"} | ||
|
||
except Exception as e: | ||
logging.error(e) | ||
return {"msg": "There is an issue while getting results !!"} | ||
|
||
|
||
app.include_router(router) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
fastapi | ||
numpy | ||
pandas | ||
requests==2.28.1 | ||
scikit-learn | ||
uvicorn | ||
xgboost | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please check with Suyue if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @lvliang-intel, Suyue has confirmed xgboost is in the BOM list |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
import json | ||
import warnings | ||
|
||
warnings.filterwarnings("ignore") | ||
|
||
import pandas as pd | ||
from sklearn.ensemble import AdaBoostClassifier, GradientBoostingClassifier, RandomForestClassifier | ||
from sklearn.linear_model import LogisticRegression | ||
from sklearn.metrics import accuracy_score | ||
from sklearn.model_selection import GridSearchCV, train_test_split | ||
from sklearn.naive_bayes import GaussianNB | ||
from sklearn.neighbors import KNeighborsClassifier | ||
from sklearn.pipeline import Pipeline | ||
from sklearn.preprocessing import LabelEncoder, MinMaxScaler | ||
from sklearn.svm import SVC | ||
from sklearn.tree import DecisionTreeClassifier | ||
from xgboost import XGBClassifier | ||
|
||
|
||
def churn_prediction(params): | ||
df = pd.read_csv("./WA_Fn-UseC_-Telco-Customer-Churn.csv") | ||
|
||
# Data Cleaning | ||
df = df.drop(["customerID"], axis=1) | ||
select_cols = ["gender", "tenure", "MonthlyCharges", "TotalCharges", "Churn"] | ||
df = df[select_cols] | ||
|
||
df["TotalCharges"] = pd.to_numeric(df.TotalCharges, errors="coerce") | ||
df.drop(labels=df[df["tenure"] == 0].index, axis=0, inplace=True) | ||
df.fillna(df["TotalCharges"].mean()) | ||
|
||
# Data Preprocessing | ||
encoders = {} | ||
|
||
def object_to_int(dataframe_series): | ||
if dataframe_series.dtype == "object": | ||
encoders[dataframe_series.name] = LabelEncoder().fit(dataframe_series) | ||
dataframe_series = encoders[dataframe_series.name].transform(dataframe_series) | ||
return dataframe_series | ||
|
||
df = df.apply(lambda x: object_to_int(x)) | ||
X = df.drop(columns=["Churn"]) | ||
y = df["Churn"].values | ||
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, random_state=40, stratify=y) | ||
|
||
model_scores = [] | ||
|
||
models = [ | ||
( | ||
"Random Forest", | ||
RandomForestClassifier(random_state=42), | ||
{"model__n_estimators": [50, 100, 200], "model__max_depth": [None, 10, 20]}, | ||
), # Add hyperparameters for Random Forest | ||
( | ||
"Gradient Boosting", | ||
GradientBoostingClassifier(random_state=42), | ||
{"model__n_estimators": [50, 100, 200], "model__learning_rate": [0.05, 0.1, 0.5]}, | ||
), # Add hyperparameters for Gradient Boosting | ||
( | ||
"Support Vector Machine", | ||
SVC(random_state=42, class_weight="balanced"), | ||
{"model__C": [0.1, 1, 10], "model__gamma": ["scale", "auto"]}, | ||
), # Add hyperparameters for SVM | ||
( | ||
"Logistic Regression", | ||
LogisticRegression(random_state=42, class_weight="balanced"), | ||
{"model__C": [0.1, 1, 10], "model__penalty": ["l1", "l2"]}, | ||
), # Add hyperparameters for Logistic Regression | ||
( | ||
"K-Nearest Neighbors", | ||
KNeighborsClassifier(), | ||
{"model__n_neighbors": [3, 5, 7], "model__weights": ["uniform", "distance"]}, | ||
), # Add hyperparameters for KNN | ||
( | ||
"Decision Tree", | ||
DecisionTreeClassifier(random_state=42), | ||
{"model__max_depth": [None, 10, 20], "model__min_samples_split": [2, 5, 10]}, | ||
), # Add hyperparameters for Decision Tree | ||
( | ||
"Ada Boost", | ||
AdaBoostClassifier(random_state=42), | ||
{"model__n_estimators": [50, 100, 200], "model__learning_rate": [0.05, 0.1, 0.5]}, | ||
), # Add hyperparameters for Ada Boost | ||
( | ||
"XG Boost", | ||
XGBClassifier(random_state=42), | ||
{"model__n_estimators": [50, 100, 200], "model__learning_rate": [0.05, 0.1, 0.5]}, | ||
), # Add hyperparameters for XG Boost | ||
("Naive Bayes", GaussianNB(), {}), # No hyperparameters for Naive Bayes | ||
] | ||
|
||
best_model = None | ||
best_accuracy = 0.0 | ||
|
||
for name, model, param_grid in models: | ||
# Create a pipeline for each model | ||
pipeline = Pipeline([("scaler", MinMaxScaler()), ("model", model)]) # Feature Scaling | ||
|
||
# Hyperparameter tuning using GridSearchCV | ||
if param_grid: | ||
grid_search = GridSearchCV(pipeline, param_grid, cv=2) | ||
grid_search.fit(X_train, y_train) | ||
pipeline = grid_search.best_estimator_ | ||
|
||
# Fit the pipeline on the training data | ||
pipeline.fit(X_train, y_train) | ||
|
||
# Make predictions on the test data | ||
y_pred = pipeline.predict(X_test) | ||
|
||
# Calculate accuracy score | ||
accuracy = accuracy_score(y_test, y_pred) | ||
|
||
# Append model name and accuracy to the list | ||
model_scores.append({"Model": name, "Accuracy": accuracy}) | ||
|
||
# Convert the list to a DataFrame | ||
scores_df = pd.DataFrame(model_scores) | ||
print("Model:", name) | ||
print("Test Accuracy:", round(accuracy, 3), "%\n") | ||
|
||
# Check if the current model has the best accuracy | ||
if accuracy > best_accuracy: | ||
best_accuracy = accuracy | ||
best_model = pipeline | ||
|
||
# Retrieve the overall best model | ||
print("Best Model:") | ||
print("Test Accuracy:", best_accuracy) | ||
print("Model Pipeline:", best_model, "with accuracy", round(best_accuracy, 2), "%\n") | ||
|
||
# Process and Predict input values from user | ||
def transform_params(name, value): | ||
return encoders[name].transform(value)[0] | ||
|
||
predict_input = {} | ||
print("Predicting with user provided params:", params) | ||
for key, value in params.items(): | ||
if key in encoders.keys(): | ||
predict_input[key] = transform_params(key, [value]) | ||
else: | ||
predict_input[key] = value | ||
|
||
predict_input = pd.DataFrame([predict_input]) | ||
result = best_model.predict(predict_input) | ||
params["prediction"] = encoders["Churn"].inverse_transform(result)[0] | ||
result = json.dumps(params) | ||
|
||
return result | ||
|
||
|
||
def run_workflow(params): | ||
return churn_prediction(params) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
#!/bin/bash | ||
# Copyright (C) 2024 Intel Corporation | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
wf_api_port=${wf_api_port} | ||
[[ -z "$wf_api_port" ]] && wf_api_port=5005 && export wf_api_port=5005 | ||
api_server_url=http://$(hostname -I | awk '{print $1}'):${wf_api_port}/ | ||
workflow_id=10071 | ||
query="I have a data with gender Female, tenure 55, MonthlyCharges 103.7, TotalCharges 1840.75. Predict if this entry will churn. My workflow id is ${workflow_id}." | ||
validate_result="the prediction is No" | ||
|
||
function stop_agent_and_api_server() { | ||
echo "Stopping Agent services" | ||
docker rm --force $(docker ps -a -q --filter="name=workflowexec-agent-endpoint") | ||
docker rm --force $(docker ps -a -q --filter="name=example-workflow-service") | ||
} | ||
|
||
function stop_vllm_docker() { | ||
cid=$(docker ps -aq --filter "name=test-comps-vllm-service") | ||
echo "Stopping the docker containers "${cid} | ||
if [[ ! -z "$cid" ]]; then docker rm $cid -f && sleep 1s; fi | ||
echo "Docker containers stopped successfully" | ||
} | ||
|
||
echo "=================== #1 Building docker images ====================" | ||
bash 1_build_images.sh | ||
echo "=================== #1 Building docker images completed ====================" | ||
|
||
echo "=================== #2 Start vllm service ====================" | ||
bash 2_start_vllm_service.sh | ||
echo "=================== #2 Start vllm service completed ====================" | ||
|
||
echo "=================== #3 Start agent service ====================" | ||
bash 3_launch_agent_service.sh $api_server_url | ||
echo "=================== #3 Agent service started ====================" | ||
|
||
echo "=================== #4 Start example workflow API ====================" | ||
bash 3_launch_example_wf_api.sh | ||
echo "=================== #4 Example workflow API started ====================" | ||
|
||
echo "=================== #5 Start validate agent ====================" | ||
bash 4_validate_agent.sh "$query" "$validate_result" | ||
echo "=================== #5 Validate agent completed ====================" | ||
|
||
echo "=================== #4 Stop all services ====================" | ||
stop_agent_and_api_server | ||
stop_vllm_docker | ||
echo "=================== #4 All services stopped ====================" | ||
|
||
echo "ALL DONE!" |
Uh oh!
There was an error while loading. Please reload this page.