-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathplaceholder_coordinator.py
49 lines (35 loc) · 1.06 KB
/
placeholder_coordinator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from fastapi import FastAPI, Request
from pydantic import BaseModel
import json, uvicorn, httpx
from rid_lib.ext import Event, CacheBundle
server = FastAPI()
sensors = {}
@server.post("/events/publish")
async def publish_event(request: Request):
print(request.url)
data = await request.json()
print(json.dumps(data, indent=2))
event = Event.from_json(data)
print(event)
remote_sensor = sensors[event.rid.context]
resp = httpx.get(remote_sensor + f"/object?rid={event.rid}")
bundle = CacheBundle.from_json(resp.json())
print(bundle)
return "success"
# @server.post("/events/subscribe")
# async def subscribe_event()
class Sensor(BaseModel):
url: str
contexts: list[str]
@server.post("/sensors")
async def register_sensor(sensor: Sensor):
for context in sensor.contexts:
sensors[context] = sensor.url
return sensors
if __name__ == "__main__":
uvicorn.run(
"placeholder_coordinator:server",
reload=True,
log_level="debug",
port=5000
)