-
Notifications
You must be signed in to change notification settings - Fork 25
/
Copy pathlive_smoke_test.py
executable file
·135 lines (105 loc) · 3.8 KB
/
live_smoke_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/python3
import argparse
import os
import typing
from databento import Dataset
from databento import Live
from databento import RecordFlags
from databento import Schema
from databento import SType
from databento_dbn import ErrorMsg
from databento_dbn import MBOMsg
from databento_dbn import RType
from databento_dbn import SymbolMappingMsg
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(prog="Python client")
parser.add_argument("--gateway", type=str, help="Gateway to connect")
parser.add_argument("--port", type=int, default=13000, help="Gatewat port to connect")
parser.add_argument(
"--api-key-env-var",
type=str,
help="Gateway to connect as Gateway::port",
default="DATABENTO_API_KEY",
)
parser.add_argument("--dataset", type=Dataset, help="Dataset")
parser.add_argument("--schema", type=Schema, help="Schema")
parser.add_argument("--stype", type=SType, help="SType")
parser.add_argument("--symbols", type=str, help="Symbols")
parser.add_argument("--start", type=str, default=None, help="Start time (rfc-339)")
parser.add_argument(
"--use-snapshot",
action="store_true",
help="Whether or not to request snapshot subscription",
)
return parser.parse_args()
def run_client(args: argparse.Namespace) -> None:
client = Live(key=get_api_key(args.api_key_env_var), gateway=args.gateway, port=args.port)
client.subscribe(
dataset=args.dataset,
schema=args.schema,
stype_in=args.stype,
symbols=args.symbols,
start=args.start,
)
print("Starting client...")
for record in client:
if is_expected_record(args, record):
print(f"Received expected record {record}")
break
elif isinstance(record, ErrorMsg):
raise ValueError(f"Received error {record.err}")
else:
print(f"{record}")
print("Finished client")
def run_client_with_snapshot(args: argparse.Namespace) -> None:
client = Live(key=get_api_key(args.api_key_env_var), gateway=args.gateway, port=args.port)
client.subscribe(
dataset=args.dataset,
schema=args.schema,
stype_in=args.stype,
symbols=args.symbols,
snapshot=True,
)
received_snapshot_record = False
print("Starting client...")
for record in client:
if isinstance(record, SymbolMappingMsg):
continue
elif isinstance(record, MBOMsg):
if record.flags & RecordFlags.F_SNAPSHOT:
received_snapshot_record = True
else:
print(f"Received expected record {record}")
break
elif isinstance(record, ErrorMsg):
raise ValueError(f"Received error {record.err}")
else:
raise ValueError(f"Received unexpected record {record}")
print("Finished client")
assert received_snapshot_record
def is_expected_record(args: argparse.Namespace, record: typing.Any) -> bool:
try:
start = int(args.start)
except Exception:
start = None
# For start != 0 we stop at SymbolMappingMsg so that the tests can be run outside trading hours
should_expect_symbol_mapping = args.stype != SType.INSTRUMENT_ID and (
start is None or start != 0
)
if should_expect_symbol_mapping:
return isinstance(record, SymbolMappingMsg)
else:
return record.rtype == RType.from_schema(args.schema)
def get_api_key(api_key_name: str) -> str:
api_key = os.getenv(api_key_name)
if not api_key:
raise ValueError(f"Invalid api_key {api_key_name}")
return api_key
def main() -> None:
args = parse_args()
if args.use_snapshot:
run_client_with_snapshot(args)
else:
run_client(args)
if __name__ == "__main__":
main()