-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.py
43 lines (31 loc) · 1.23 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from threading import Thread
from typing import Callable, Tuple
from config import Config
from http.server import SimpleHTTPRequestHandler
import socketserver
def base_url(addr: str, port: int) -> str:
return f"http://{addr}:{port}"
class Server(socketserver.TCPServer):
allow_reuse_address = True
def serve_forever(addr: str, port: int, config: Config):
class Handler(SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=config.output_path, **kwargs)
with Server((addr, port), Handler) as httpd:
print(f"Serving at {base_url(addr, port)}")
httpd.serve_forever()
def serve_until(
addr: str, port: int, config: Config, keep_running_func: Callable[[], bool]
) -> Tuple[Thread, Server]:
class Handler(SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=config.output_path, **kwargs)
httpd = Server((addr, port), Handler)
def serve(httpd):
with httpd:
print(f"Serving at {base_url(addr, port)}")
httpd.serve_forever()
print("Stopped.")
thread = Thread(target=serve, args=[httpd])
thread.start()
return thread, httpd