Skip to content
This repository was archived by the owner on Feb 27, 2024. It is now read-only.

Commit 88a6b52

Browse files
authored
Merge pull request #1 from drewm-jpl/scale
Scale the number of Verdi worker nodes in the Kubernetes cluster
2 parents d71ffee + 9861e06 commit 88a6b52

File tree

3 files changed

+273
-36
lines changed

3 files changed

+273
-36
lines changed

app/main.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,25 @@
1+
import asyncio
12
import os
2-
from fastapi import Depends, FastAPI
3+
4+
from fastapi import FastAPI
35
from fastapi.middleware.cors import CORSMiddleware
4-
from fastapi.openapi.utils import get_openapi
56
from mangum import Mangum
67

78
from .routers import prewarm
89

9-
1010
app = FastAPI(
11-
title="Unity SPS REST API",
12-
version="0.0.1",
13-
description="Unity SPS Operations",
14-
root_path=f"/{os.environ.get('STAGE')}/" if "STAGE" in os.environ else None
11+
title="Unity SPS REST API",
12+
version="0.0.1",
13+
description="Unity SPS Operations",
14+
root_path=f"/{os.environ.get('STAGE')}/" if "STAGE" in os.environ else None,
1515
)
1616

17+
18+
@app.on_event("startup")
19+
async def startup_event():
20+
asyncio.create_task(prewarm.process_prewarm_queue())
21+
22+
1723
app.add_middleware(
1824
CORSMiddleware,
1925
allow_credentials=True,

app/routers/prewarm.py

Lines changed: 257 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,25 @@
1-
from fastapi import APIRouter
2-
from pydantic import BaseModel
1+
import asyncio
2+
import os
3+
import uuid
4+
from datetime import datetime
5+
from functools import wraps
6+
from typing import Dict, List
7+
8+
import boto3
9+
import botocore
10+
from fastapi import APIRouter, HTTPException
11+
from fastapi.responses import JSONResponse
12+
from kubernetes import client, config
13+
from pydantic import BaseModel, Field
14+
15+
# Load the Kubernetes configuration
16+
config.load_incluster_config()
17+
18+
REGION_NAME = os.environ.get("AWS_REGION_NAME")
19+
EKS_CLUSTER_NAME = os.environ.get("EKS_CLUSTER_NAME")
20+
VERDI_NODE_GROUP_NAME = os.environ.get("VERDI_NODE_GROUP_NAME")
21+
VERDI_DAEMONSET_NAMESPACE = os.environ.get("VERDI_DAEMONSET_NAMESPACE")
22+
VERDI_DAEMONSET_NAME = os.environ.get("VERDI_DAEMONSET_NAME")
323

424
router = APIRouter(
525
prefix="/sps",
@@ -12,45 +32,253 @@
1232
},
1333
)
1434

35+
1536
class PrewarmRequest(BaseModel):
1637
num_nodes: int
1738

39+
1840
class PrewarmResponse(BaseModel):
1941
success: bool
2042
message: str
21-
request_id: str
43+
prewarm_request_id: str = None
44+
45+
46+
class PrewarmRequestInfo(BaseModel):
47+
status: str
48+
last_update_timestamp: str = Field(
49+
default_factory=lambda: datetime.utcnow().isoformat()
50+
)
51+
num_nodes: int
52+
ready_nodes: int
53+
node_group_update: dict = Field(default=None)
54+
error: str = Field(default=None)
55+
56+
57+
prewarm_requests_lock = asyncio.Lock()
58+
prewarm_requests_queue: asyncio.Queue = asyncio.Queue()
59+
prewarm_requests: Dict[str, PrewarmRequestInfo] = {}
60+
61+
62+
class ReadyNodesResponse(BaseModel):
63+
ready_nodes: int
64+
65+
66+
class NodeGroupInfo(BaseModel):
67+
instance_types: List[str]
68+
desired_size: int
69+
min_size: int
70+
max_size: int
71+
ready_nodes: int
72+
2273

2374
class HealthCheckResponse(BaseModel):
2475
message: str
2576

2677

78+
def get_ready_nodes_in_daemonset() -> int:
79+
v1 = client.AppsV1Api()
80+
daemonset = v1.read_namespaced_daemon_set(
81+
VERDI_DAEMONSET_NAME, VERDI_DAEMONSET_NAMESPACE
82+
)
83+
return daemonset.status.number_ready
84+
85+
86+
async def scale_nodes(num_nodes: int, request_id: str):
87+
try:
88+
eks = boto3.client("eks", region_name=REGION_NAME)
89+
90+
ready_nodes = get_ready_nodes_in_daemonset()
91+
async with prewarm_requests_lock:
92+
prewarm_requests[request_id] = PrewarmRequestInfo(
93+
status="Running",
94+
num_nodes=num_nodes,
95+
ready_nodes=ready_nodes,
96+
)
97+
update_response = eks.update_nodegroup_config(
98+
clusterName=EKS_CLUSTER_NAME,
99+
nodegroupName=VERDI_NODE_GROUP_NAME,
100+
scalingConfig={"desiredSize": num_nodes},
101+
)
102+
node_group_update_id = update_response["update"]["id"]
103+
await asyncio.sleep(5)
104+
105+
while True:
106+
ready_nodes = get_ready_nodes_in_daemonset()
107+
describe_update_response = eks.describe_update(
108+
name=EKS_CLUSTER_NAME,
109+
nodegroupName=VERDI_NODE_GROUP_NAME,
110+
updateId=node_group_update_id,
111+
)
112+
async with prewarm_requests_lock:
113+
prewarm_requests[
114+
request_id
115+
].last_update_timestamp = datetime.utcnow().isoformat()
116+
prewarm_requests[request_id].ready_nodes = ready_nodes
117+
prewarm_requests[
118+
request_id
119+
].node_group_update = describe_update_response["update"]
120+
121+
if ready_nodes == num_nodes:
122+
async with prewarm_requests_lock:
123+
prewarm_requests[request_id].status = "Succeeded"
124+
break
125+
126+
await asyncio.sleep(5) # Check the DaemonSet status every 5 seconds
127+
128+
except Exception as e:
129+
async with prewarm_requests_lock:
130+
prewarm_requests[request_id].status = "Failed"
131+
prewarm_requests[request_id].error = str(e)
132+
133+
134+
def is_valid_num_nodes(func):
135+
@wraps(func)
136+
async def wrapper(req, *args, **kwargs):
137+
try:
138+
eks = boto3.client("eks", region_name=REGION_NAME)
139+
response = eks.describe_nodegroup(
140+
clusterName=EKS_CLUSTER_NAME,
141+
nodegroupName=VERDI_NODE_GROUP_NAME,
142+
)
143+
node_group = response["nodegroup"]
144+
current_desired_size = node_group["scalingConfig"]["desiredSize"]
145+
max_size = node_group["scalingConfig"]["maxSize"]
146+
min_size = node_group["scalingConfig"]["minSize"]
147+
148+
if req.num_nodes > max_size:
149+
message = f"Requested number of nodes ({req.num_nodes}) is larger than the node group's max size ({max_size})"
150+
return JSONResponse(
151+
status_code=422,
152+
content={"message": message},
153+
)
154+
elif req.num_nodes < min_size:
155+
message = f"Requested number of nodes ({req.num_nodes}) is smaller than the node group's min size ({min_size})"
156+
return JSONResponse(
157+
status_code=422,
158+
content={"message": message},
159+
)
160+
elif req.num_nodes == current_desired_size:
161+
message = f"Requested number of nodes ({req.num_nodes}) is already equal to the current desired size"
162+
return JSONResponse(
163+
status_code=422,
164+
content={"message": message},
165+
)
166+
else:
167+
return await func(req, *args, **kwargs)
168+
except botocore.exceptions.ClientError as e:
169+
message = (
170+
f"Error occurred while validating requested number of nodes: {str(e)}"
171+
)
172+
return JSONResponse(
173+
status_code=500,
174+
content={"message": message},
175+
)
176+
except Exception as e:
177+
message = f"Unexpected error occurred while validating requested number of nodes: {str(e)}"
178+
return JSONResponse(
179+
status_code=500,
180+
content={"message": message},
181+
)
182+
183+
return wrapper
184+
185+
186+
async def process_prewarm_queue():
187+
while True:
188+
request_info = await prewarm_requests_queue.get()
189+
num_nodes = request_info["num_nodes"]
190+
request_id = request_info["request_id"]
191+
await scale_nodes(num_nodes, request_id)
192+
prewarm_requests_queue.task_done()
193+
194+
27195
@router.post("/prewarm")
28-
async def create_prewarm_request(
29-
req: PrewarmRequest
30-
) -> PrewarmResponse:
31-
return {
32-
"success": True,
33-
"message": "Prewarm is not implemented, this request has no effect.",
34-
"request_id": f"{req.num_nodes}"
35-
}
36-
37-
38-
@router.get("/prewarm/{request_id}")
39-
async def get_prewarm_request(request_id: str) -> PrewarmResponse:
40-
return {
41-
"success": True,
42-
"message": f"Status for prewarm request ID {request_id}.",
43-
"request_id": request_id
44-
}
45-
46-
@router.delete("/prewarm/{request_id}")
47-
async def delete_prewarm_request(request_id: str) -> PrewarmResponse:
48-
return {
49-
"success": True,
50-
"message": f"Prewarm request ID {request_id} deleted.",
51-
"request_id": request_id
52-
}
196+
@is_valid_num_nodes
197+
async def create_prewarm_request(req: PrewarmRequest) -> PrewarmResponse:
198+
try:
199+
# Generate a unique request ID
200+
request_id = str(uuid.uuid4())
201+
ready_nodes = get_ready_nodes_in_daemonset()
202+
async with prewarm_requests_lock:
203+
prewarm_requests[request_id] = PrewarmRequestInfo(
204+
status="Accepted",
205+
num_nodes=req.num_nodes,
206+
ready_nodes=ready_nodes,
207+
)
208+
209+
# Add the request to the prewarm_requests_queue
210+
await prewarm_requests_queue.put(
211+
{
212+
"num_nodes": req.num_nodes,
213+
"request_id": request_id,
214+
}
215+
)
216+
217+
prewarm_response = PrewarmResponse(
218+
success=True,
219+
message=f"Prewarm request accepted with ID {request_id}",
220+
prewarm_request_id=request_id,
221+
)
222+
223+
except Exception as e:
224+
prewarm_response = PrewarmResponse(
225+
success=False,
226+
message=f"Unexpected error occurred while creating prewarm request: {str(e)}",
227+
)
228+
229+
return prewarm_response
230+
231+
232+
@router.get("/prewarm/{prewarm_request_id}")
233+
async def get_prewarm_status(prewarm_request_id: str) -> PrewarmRequestInfo:
234+
async with prewarm_requests_lock:
235+
if prewarm_request_id not in prewarm_requests:
236+
raise HTTPException(status_code=404, detail="Prewarm request not found")
237+
response = prewarm_requests[prewarm_request_id]
238+
return response
239+
240+
241+
@router.get("/ready-nodes")
242+
async def ready_nodes() -> ReadyNodesResponse:
243+
try:
244+
ready_nodes = get_ready_nodes_in_daemonset()
245+
ready_nodes_response = ReadyNodesResponse(ready_nodes=ready_nodes)
246+
except Exception as e:
247+
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
248+
249+
return ready_nodes_response
250+
251+
252+
@router.get("/node-group-info")
253+
async def get_node_group_info() -> NodeGroupInfo:
254+
eks = boto3.client("eks", region_name=REGION_NAME)
255+
try:
256+
response = eks.describe_nodegroup(
257+
clusterName=EKS_CLUSTER_NAME,
258+
nodegroupName=VERDI_NODE_GROUP_NAME,
259+
)
260+
node_group = response["nodegroup"]
261+
scaling_config = node_group["scalingConfig"]
262+
instance_types = node_group["instanceTypes"]
263+
desired_size = scaling_config["desiredSize"]
264+
min_size = scaling_config["minSize"]
265+
max_size = scaling_config["maxSize"]
266+
ready_nodes = get_ready_nodes_in_daemonset()
267+
node_group_info = NodeGroupInfo(
268+
instance_types=instance_types,
269+
desired_size=desired_size,
270+
min_size=min_size,
271+
max_size=max_size,
272+
ready_nodes=ready_nodes,
273+
)
274+
except botocore.exceptions.ClientError as e:
275+
raise HTTPException(
276+
status_code=500,
277+
detail=f"Error occurred while getting node group info: {str(e)}",
278+
)
279+
return node_group_info
280+
53281

54282
@router.get("/health-check")
55283
async def health_check() -> HealthCheckResponse:
56-
return {"message": "The U-SPS On-Demand API is running and accessible"}
284+
return {"message": "The U-SPS On-Demand API is running and accessible"}

requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,6 @@ starlette==0.25.0
2020
tomli==2.0.1
2121
typing_extensions==4.5.0
2222
uvicorn==0.20.0
23+
boto3==1.26.94
24+
botocore==1.29.94
25+
kubernetes==26.1.0

0 commit comments

Comments
 (0)