-
Notifications
You must be signed in to change notification settings - Fork 0
/
domain.py
34 lines (28 loc) · 976 Bytes
/
domain.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import decimal
from dataclasses import dataclass, asdict
from datetime import datetime
from decimal import Decimal
from bson.decimal128 import create_decimal128_context, Decimal128
@dataclass(frozen=True)
class AssetDataEntry:
mid_price: Decimal
best_bid: Decimal
best_ask: Decimal
total_ask_1: float
total_ask_4: float
total_bid_1: float
total_bid_4: float
book_bias_1: float
book_bias_4: float
last_book_update: datetime
current_time: datetime
def to_dict(self) -> dict:
# Convert the data class to a dictionary
data = asdict(self)
# Convert Decimal fields to MongoDB's Decimal128
for field in ['mid_price', 'best_bid', 'best_ask']:
if field in data:
decimal128_ctx = create_decimal128_context()
with decimal.localcontext(decimal128_ctx) as ctx:
data[field] = Decimal128(ctx.create_decimal(data[field]))
return data