forked from envoyproxy/nighthawk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnighthawk_grpc_service.py
102 lines (89 loc) · 3.45 KB
/
nighthawk_grpc_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import logging
import socket
import subprocess
import tempfile
import threading
import time
from common import IpVersion
# TODO(oschaaf): unify some of this code with the test server wrapper.
class NighthawkGrpcService(object):
"""
Class for running the Nighthawk gRPC service in a separate process.
Usage:
grpc_service = NighthawkGrpcService("/path/to/nighthawk_service"), "127.0.0.1", IpVersion.IPV4)
if grpc_service.start():
.... You can talk to the Nighthawk gRPC service at the 127.0.0.1:grpc_service.server_port ...
Attributes:
server_ip: IP address used by the gRPC service to listen.
server_port: An integer, indicates the port used by the gRPC service to listen. 0 means that the server is not listening.
"""
def __init__(self,
server_binary_path,
server_ip,
ip_version,
service_name="traffic-generator-service"):
"""Initializes Nighthawk gRPC service.
Args:
server_binary_path: A string, indicates where the nighthawk gRPC service binary resides
server_ip: IP address, indicates which ip address should be used by the gRPC service listener.
ip_version: IP Version, indicates if IPv4 or IPv6 should be used.
service_name: The Nighthawk service to run.
...
"""
assert ip_version != IpVersion.UNKNOWN
self.server_port = 0
self.server_ip = server_ip
self._server_process = None
self._ip_version = ip_version
self._server_binary_path = server_binary_path
self._socket_type = socket.AF_INET6 if ip_version == IpVersion.IPV6 else socket.AF_INET
self._server_thread = threading.Thread(target=self._serverThreadRunner)
self._address_file = None
self._service_name = service_name
def _serverThreadRunner(self):
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".tmp") as tmp:
self._address_file = tmp.name
args = [
self._server_binary_path, "--listener-address-file", self._address_file, "--listen",
"%s:0" % str(self.server_ip), "--service", self._service_name
]
logging.info("Nighthawk grpc service popen() args: [%s]" % args)
self._server_process = subprocess.Popen(args)
self._server_process.communicate()
self._address_file = None
def _waitUntilServerListening(self):
tries = 90
while tries > 0:
contents = ""
if not self._address_file is None:
try:
with open(self._address_file) as f:
contents = f.read().strip()
except IOError:
pass
if contents != "":
tmp = contents.split(":")
assert (len(tmp) >= 2)
self.server_port = int(tmp[len(tmp) - 1])
return True
time.sleep(0.5)
tries -= 1
logging.error("Timeout while waiting for server listener at %s:%s to accept connections.",
self.server_ip, self.server_port)
return False
def start(self):
"""
Starts the Nighthawk gRPC service. Returns True upon success, after which the server_port attribute
can be queried to get the listening port.
"""
self._server_thread.daemon = True
self._server_thread.start()
return self._waitUntilServerListening()
def stop(self):
"""
Signals the Nighthawk gRPC service to stop, waits for its termination, and returns the exit code of the associated process.
"""
self._server_process.terminate()
self._server_thread.join()
self.server_port = 0
return self._server_process.returncode