Skip to content

Commit

Permalink
Copilot review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jscheffl committed Nov 24, 2024
1 parent 0d389f7 commit a3e0e80
Showing 1 changed file with 11 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,15 @@ def rpcapi_v2(body: dict[str, Any]) -> APIResponse:
"""Handle Edge Worker API `/edge_worker/v1/rpcapi` endpoint for Airflow 2.10."""
# Note: Except the method map this _was_ a 100% copy of internal API module
# airflow.api_internal.endpoints.rpc_api_endpoint.internal_airflow_api()
# As of rework for FastAPI in Airflow 3.0, this is updated and to be removed in future.
# As of rework for FastAPI in Airflow 3.0, this is updated and to be removed in the future.
from flask import Response, request

try:
json_request_headers(
content_type=request.headers.get("Content-Type", ""), accept=request.headers.get("Accept", "")
)
auth = request.headers.get("Authorization", "")
json_rpc = body.get("jsonrpc", "")
method_name = body.get("method", "")
request_obj = JsonRpcRequest(method=method_name, jsonrpc=json_rpc, params=body.get("params"))
request_obj = JsonRpcRequest(method=body["method"], jsonrpc=body["jsonrpc"], params=body["params"])
jwt_token_authorization_rpc(request_obj, auth)
json_rpc_version(request_obj)
output_json = rpcapi(request_obj)
Expand All @@ -64,12 +62,9 @@ def register_v2(worker_name: str, body: dict[str, Any], session=NEW_SESSION) ->
try:
auth = request.headers.get("Authorization", "")
jwt_token_authorization(request.path, auth)

state = body.get("state", "")
queues = body.get("queues", "")
sysinfo = body.get("sysinfo", "")
request_obj = WorkerStateBody(state=state, jobs_active=0, queues=queues, sysinfo=sysinfo)

request_obj = WorkerStateBody(
state=body["state"], jobs_active=0, queues=body["queues"], sysinfo=body["sysinfo"]
)
return register(worker_name, request_obj, session)
except HTTPException as e:
return e.to_response() # type: ignore[attr-defined]
Expand All @@ -83,13 +78,12 @@ def set_state_v2(worker_name: str, body: dict[str, Any], session=NEW_SESSION) ->
try:
auth = request.headers.get("Authorization", "")
jwt_token_authorization(request.path, auth)

state = body.get("state", "")
jobs_active = int(body.get("jobs_active", ""))
queues = body.get("queues", "")
sysinfo = body.get("sysinfo", "")
request_obj = WorkerStateBody(state=state, jobs_active=jobs_active, queues=queues, sysinfo=sysinfo)

request_obj = WorkerStateBody(
state=body["state"],
jobs_active=body["jobs_active"],
queues=body["queues"],
sysinfo=body["sysinfo"],
)
return set_state(worker_name, request_obj, session)
except HTTPException as e:
return e.to_response() # type: ignore[attr-defined]

0 comments on commit a3e0e80

Please sign in to comment.