diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..fb472568 --- /dev/null +++ b/Makefile @@ -0,0 +1,15 @@ +# until we move telem service and data files to sphinx account, use the trex one +SHELL := /bin/bash + +clean: + @cd docker; docker compose down + +network-test: + @cd docker; docker compose up --build --detach + @echo "network test running" + +logs: + @cd docker; docker compose logs --follow + +bash: + @cd docker; docker compose exec ait-server bash diff --git a/ait/core/server/client.py b/ait/core/server/client.py index 67478326..40a441ae 100644 --- a/ait/core/server/client.py +++ b/ait/core/server/client.py @@ -134,6 +134,7 @@ def __init__( super(OutputClient, self).__init__( zmq_context, zmq_proxy_xsub_url, zmq_proxy_xpub_url ) + self.protocol = "UDP" output_spec = kwargs.get("output", None) if output_spec is None: raise ValueError(f"Invalid output client specification: {output_spec}") @@ -167,6 +168,7 @@ def __init__( and type(output_spec[1]) is str and type(output_spec[2]) is int ): + self.protocol = "TCP" self.host = output_spec[1] self.out_port = output_spec[2] self.pub = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -175,7 +177,11 @@ def __init__( self.context = zmq_context def publish(self, msg): - self.pub.sendto(msg, (self.host, int(self.out_port))) + if self.protocol == "TCP": + self.pub.connect((self.host, int(self.out_port))) + self.pub.sendall(msg) + else: + self.pub.sendto(msg, (self.host, int(self.out_port))) log.debug("Published message from {}".format(self)) @@ -289,6 +295,7 @@ def handle(self, socket, address): break log.debug("{} received message from port {}".format(self, address)) self.process(data) + gevent.sleep(0) # pass control back class TCPInputClient(ZMQClient): diff --git a/ait/core/server/handlers/__init__.py b/ait/core/server/handlers/__init__.py index 91215123..15a554e4 100644 --- a/ait/core/server/handlers/__init__.py +++ b/ait/core/server/handlers/__init__.py @@ -1,2 +1,3 @@ from .ccsds_packet_handler import * # noqa +from .debug_hanlder import DebugHandler # noqa from .packet_handler import * # noqa diff --git a/ait/core/server/handlers/debug_hanlder.py b/ait/core/server/handlers/debug_hanlder.py new file mode 100644 index 00000000..aab598b4 --- /dev/null +++ b/ait/core/server/handlers/debug_hanlder.py @@ -0,0 +1,12 @@ +import ait.core.log +from ait.core.server.handler import Handler + + +class DebugHandler(Handler): + def __init__(self, input_type=None, output_type=None, **kwargs): + super(DebugHandler, self).__init__(input_type, output_type) + self.handler_name = kwargs.get("handler_name", "DebugHandler") + + def handle(self, input_data): + ait.core.log.info(f"{self.handler_name} received {len(input_data)} bytes") + return input_data diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 00000000..495a917b --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,37 @@ +FROM registry.access.redhat.com/ubi8/ubi:8.9-1136 + +ENV LOG_LEVEL=INFO +ARG USER=ait +ARG GROUP=ait +ARG UID=1001 +ARG GID=1001 +ARG HOME=/home/$USER +ENV PROJECT_HOME=/home/$USER + +RUN dnf install -y python3.9 python3-pip \ + && yum install -y nc \ + && groupadd -r -g ${GID} ${GROUP} \ + && useradd -m -u ${UID} -g ${GROUP} ${USER} + +USER ait +WORKDIR $PROJECT_HOME +COPY --chown=${USER}:${GROUP} . $PROJECT_HOME/AIT-Core +RUN python3.9 -m pip install --user --upgrade pip setuptools virtualenvwrapper virtualenv poetry \ + && echo 'export PATH="${PROJECT_HOME}/.local/bin:$PATH"' >> ~/.bashrc \ + && echo 'export VIRTUALENVWRAPPER_PYTHON=/usr/bin/python3.9' >> ~/.bashrc \ + && echo 'export WORKON_HOME=${PROJECT_HOME}/.virtualenvs' >> ~/.bashrc \ + && echo 'export PROJECT_HOME=${PROJECT_HOME}' >> ~/.bashrc \ + && echo 'export VIRTUALENVWRAPPER_VIRTUALENV=${PROJECT_HOME}/.local/bin/virtualenv' >> ~/.bashrc \ + && echo 'source ${PROJECT_HOME}/.local/bin/virtualenvwrapper.sh' >> ~/.bashrc \ + && source ~/.bashrc \ + && cd $PROJECT_HOME \ + && echo 'if [ $VIRTUAL_ENV == "${PROJECT_HOME}/.virtualenvs/ait" ]; then' >> $PROJECT_HOME/.virtualenvs/postactivate \ + && echo 'export AIT_ROOT=${PROJECT_HOME}/AIT-Core' >> $PROJECT_HOME/.virtualenvs/postactivate \ + && echo 'export AIT_CONFIG=${PROJECT_HOME}/AIT-Core/docker/network-test-config.yaml' >> $PROJECT_HOME/.virtualenvs/postactivate \ + && echo 'fi' >> $PROJECT_HOME/.virtualenvs/postactivate \ + && cd AIT-Core \ + && mkvirtualenv ait \ + && poetry install +ENTRYPOINT ["/usr/bin/bash","-c"] +CMD ["source /home/ait/.bashrc && cd AIT-Core && workon ait && ait-server"] +#CMD ["sleep infinity"] diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 00000000..1a745bed --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,68 @@ +services: + ait-server: + container_name: ait-server + platform: linux/amd64 + build: + context: ../ + dockerfile: ./docker/Dockerfile + depends_on: [tcp-server, udp-server] + environment: + LOG_LEVEL: INFO + networks: + - ait + + tcp-client: + container_name: tcp-client + platform: linux/amd64 + build: + context: ../ + dockerfile: ./docker/Dockerfile + depends_on: [ait-server] + command: ["source /home/ait/.bashrc && cd AIT-Core && workon ait && python scripts/network_tester.py client TCP ait-server 1234"] + networks: + - ait + + udp-client: + container_name: udp-client + platform: linux/amd64 + build: + context: ../ + dockerfile: ./docker/Dockerfile + depends_on: [ait-server] + command: ["source /home/ait/.bashrc && cd AIT-Core && workon ait && python scripts/network_tester.py client UDP ait-server 1235"] + networks: + - ait + + tcp-server: + container_name: tcp-server + platform: linux/amd64 + build: + context: ../ + dockerfile: ./docker/Dockerfile + command: ["source /home/ait/.bashrc && cd AIT-Core && workon ait && python -u scripts/network_tester.py server TCP 0.0.0.0 1237"] + networks: + - ait + + tcp-server-send: + container_name: tcp-server-send + platform: linux/amd64 + build: + context: ../ + dockerfile: ./docker/Dockerfile + command: ["source /home/ait/.bashrc && cd AIT-Core && workon ait && python -u scripts/network_tester.py server TCP-SEND 0.0.0.0 1238"] + networks: + - ait + + udp-server: + container_name: udp-server + platform: linux/amd64 + build: + context: ../ + dockerfile: ./docker/Dockerfile + command: ["source /home/ait/.bashrc && cd AIT-Core && workon ait && python -u scripts/network_tester.py server UDP 0.0.0.0 1236"] + networks: + - ait + +networks: + ait: + name: ait diff --git a/docker/network-test-config.yaml b/docker/network-test-config.yaml new file mode 100755 index 00000000..3380fb51 --- /dev/null +++ b/docker/network-test-config.yaml @@ -0,0 +1,61 @@ +default: + + cmddict: + filename: ../config/cmd.yaml + + tlmdict: + filename: ../config/tlm.yaml + + server: + + inbound-streams: + + - stream: + name: input_stream_debug_1 + input: + - "TCP" + - "0.0.0.0" + - 1234 + handlers: + - name: ait.core.server.handlers.DebugHandler + handler_name: "TCP Server" + + + - stream: + name: input_stream_debug_2 + input: + - "UDP" + - "0.0.0.0" + - 1235 + handlers: + - name: ait.core.server.handlers.DebugHandler + handler_name: "UDP Server" + + - stream: + name: input_stream_debug_3 + input: + - "TCP" + - "tcp-server-send" + - 1238 + handlers: + - name: ait.core.server.handlers.DebugHandler + handler_name: "TCP Client" + + outbound-streams: + - stream: + input: + - input_stream_debug_1 + name: output_stream_debug_1 + output: + - "TCP" + - "tcp-server" + - 1237 + + - stream: + name: output_stream_debug_2 + input: + - input_stream_debug_2 + output: + - "UDP" + - "udp-server" + - 1236 diff --git a/scripts/network_tester.py b/scripts/network_tester.py new file mode 100644 index 00000000..f8b3e110 --- /dev/null +++ b/scripts/network_tester.py @@ -0,0 +1,73 @@ +import socket +import sys +import time + +RATE = .01 # Do a little throttling so we dont completely thrash the server +BUFF_SIZE = 1024 +TEST_DATA = b'U'*BUFF_SIZE +def main(mode,protocol,host,port): + if mode == "server": + if protocol == "TCP" or protocol == "TCP-SEND": + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.bind((host, port)) + sock.listen() + connection, address = sock.accept() + with connection: + if protocol == "TCP": + ts = time.time() + data_size = 0 + while True: + buf = connection.recv(BUFF_SIZE) + data_size += len(buf) + te = time.time() + print(f"Received {data_size} bytes from {address} - est data rate: {data_size / (te-ts)}") + else: + while True: + connection.sendall(TEST_DATA) + time.sleep(RATE) + if protocol == "UDP": + server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + server_socket.bind((host, port)) + ts = time.time() + data_size = 0 + while True: + buf, address = server_socket.recvfrom(BUFF_SIZE) + data_size += len(buf) + te = time.time() + print(f"Received {data_size} bytes from {address} - est data rate: {data_size / (te-ts)}") + if mode == "client": + if protocol == "UDP": + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + while True: + try: + print(f"Sent {len(TEST_DATA)} bytes to {host}:{port}") + sock.sendto(TEST_DATA, (host, port)) + time.sleep(RATE) + except Exception as e: + print(e) + continue + if protocol == "TCP": + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + connected = False + while not connected: + try: + sock.connect((host, port)) + connected = True + except Exception as e: + print("retrying connection") + time.sleep(1) + while True: + try: + sock.send(TEST_DATA) + time.sleep(RATE) + except Exception as e: + print(e) + continue + +if __name__ == "__main__": + print(sys.argv) + mode = sys.argv[1] + protocol = sys.argv[2] + host = sys.argv[3] + port = sys.argv[4] + sys.exit(main(mode,protocol,host,int(port)))