-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumers.py
66 lines (49 loc) · 1.75 KB
/
consumers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import json
from fastapi import HTTPException
def create_delivery(state, event):
data = json.loads(event.data)
return {
"id": event.delivery_id,
"budget": int(data["budget"]),
"notes": data["notes"],
"status": "ready",
}
def start_delivery(state, event):
if state["status"] != "ready":
raise HTTPException(status_code=400, detail="Delivery already started")
return state | {"status": "active"}
def pickup_products(state, event):
data = json.loads(event.data)
new_budget = state["budget"] - int(data["purchase_price"]) * int(data["quantity"])
new_quantity = state["quantity"] + int(data["quantity"])
if new_budget < 0:
raise HTTPException(status_code=400, detail="Not enough budget")
return state | {
"budget": new_budget,
"purchase_price": int(data["purchase_price"]),
"quantity": new_quantity,
"status": "collected",
}
def deliver_products(state, event):
data = json.loads(event.data)
new_budget = state["budget"] + int(data["sell_price"]) * int(data["quantity"])
new_quantity = state["quantity"] - int(data["quantity"])
if new_quantity < 0:
raise HTTPException(status_code=400, detail="Not enough quantity")
return state | {
"budget": new_budget,
"sell_price": int(data["sell_price"]),
"quantity": new_quantity,
"status": "completed",
}
def increase_budget(state, event):
data = json.loads(event.data)
state["budget"] += int(data["budget"])
return state
CONSUMERS = {
"CREATE_DELIVERY": create_delivery,
"START_DELIVERY": start_delivery,
"PICKUP_PRODUCTS": pickup_products,
"DELIVER_PRODUCTS": deliver_products,
"INCREASE_BUDGET": increase_budget,
}