diff --git a/tempesta_fw/t/functional/.gitignore b/tempesta_fw/t/functional/.gitignore new file mode 100644 index 0000000000..c83f53eb9d --- /dev/null +++ b/tempesta_fw/t/functional/.gitignore @@ -0,0 +1 @@ +tests_config.ini diff --git a/tempesta_fw/t/functional/README.md b/tempesta_fw/t/functional/README.md new file mode 100644 index 0000000000..d0b3fd2b79 --- /dev/null +++ b/tempesta_fw/t/functional/README.md @@ -0,0 +1,215 @@ +# Functional Tests for TempestaFW + +## Recommended configuration + +Running tests during development process can cause crashes to TempestaFW. +Since TempestaFW is implemented as a set of kernel modules it is not convenient +to run testing framework on the same host. It is recommended to run testing +framework on a separated host. + +Recommended test-beds: + +- Local testing. All parts of the testing framework are running on the same +host. The simpliest configuration to check that current revision of TempestaFW +passes all the functional tests. It is default configuration. +``` + ┌─────────────────────────────────────────────┐ + │ Testing Framework + TempestaFW + Web Server │ + └─────────────────────────────────────────────┘ +``` + +- With isolated testing framework. This preset more helpful for development +process, since testing framework itself is isolated from possible kernel +crashes or hangs. This configuration is recommended for TempestaFW developers. +``` + ┌───────────────────┐ + │ Testing Framework ├────┐ + └──────┬────────────┘ │ Management over SSH + │ ┌──┴──────────────────────┐ + │ │ TempestaFW + Web Server │ + │ └───────────────┬─────────┘ + └──────────────────────────────┘ + Separated network for test traffic +``` + +- Fully distributed. 3 different hosts with their own roles are used. This +configuration isolates traffic generated by benchmark utilities and traffic +generators in test network. Handy for stress and performance testing but require +a lot of resources. +``` + ┌───────────────────┐ + │ Testing Framework ├────┐ + └──────┬────────────┘ │ Management over SSH + │ ├────────────────────┐ + │ ┌──────┴─────┐ ┌─────┴──────┐ + │ │ TempestaFW │ │ Web Server │ + │ └──────┬─────┘ └─────┬──────┘ + └─────────────────┴────────────────────┘ + Separated network for test traffic +``` + +There is two different models of tests: workload tests and pure functional +tests. Workload tests uses fully functional HTTP benchmark programs (ab, siege, +wrk) and HTTP servers (Apache, nginx) to check TempestaFW behaviour. This type +of tests is used for schedulers, stress and performance testing. + +Pure functional tests check internal logic. Here combined HTTP client-server +server is used. It sends HTTP messages to TempestaFW, analyses how they are +forwarded to server, and vice versa, which server connections are used. + + +## Requirements + +- Host for testing framework: `Python2`, `python2-paramiko`, +`python-configparser`, `python-subprocess32`, `wrk`, `ab`, `siege` +- All hosts except previous one: `sftp-server` +- Host for running TempestaFW: Linux kernel with Tempesta, TempestaFW sources +- Host for running server: `nginx`, web content directory accessible by nginx + +`wrk` is an HTTP benchmarking tool, available from [Github](https://github.com/wg/wrk). + +`ab` is Apache benchmark tool, that can be found in `apache2-utils` package in +Debian or `httpd-tools` in CentOS. + +`siege` is an HTTP benchmarking tool, available in `siege` package in Debian +and `siege` in [EPEL repository](https://dl.fedoraproject.org/pub/epel/7/x86_64/s/siege-4.0.2-2.el7.x86_64.rpm) +in CentOS. + +Unfortunately, CentOS does not have `python-subprocess32` package, but it can be +downloaded from [CentOS CBS](https://cbs.centos.org/koji/buildinfo?buildID=10904) + +Testing framework manages other hosts via SSH protocol, so the host running +testing framework must be able to be authenticated on other hosts by the key. +That can be done using `ssh-copy-id`. + + +## Run tests + +### Configuration + +Testing framework is configured via `tests_config.ini' file. Example +configuration is described in `tests_config.ini.sample' file. +You can also create default tests configuration by calling: + +```sh +$ ./run_tests.py -d +``` + +There is 4 sections in configuration: `General`, `Client`, `Tempesta`, `Server`. + +#### General Section + +`General` section describes the options related to testing framework itself. + +`verbose`: verbose level of output: +- `0` - quiet mode, result of each test is shown by symbols. `.` - passed, `F` - +failed, `u` - unexpected success, `x` - expected failure. `s` - skipped; +- `1` - Show test names and doc strings; +- `2` - Show tests names and performance counters; +- `3` - Full debug output. + +`Duration` option controls duration in seconds of each workload test. Use small +values to obtain results quickly add large for more heavy stress tests. Default +is `10` seconds. + +This group of options can be overridden by command line options, for more +information run tests with `-h` key. +```sh +$ ./run_tests.py -h +``` + +#### Client Section + +`workdir` - directory to place temporary files (configs, pidfiles, etc.) on the +host. R/W access is required, must be absolute path. + +`ab`, `siege` and `wrk` - absolute path to corresponding binaries. + +#### Tempesta Section + +`ip` - IPv4/IPv6 address of the host in test network. Used to run `wrk`, +`tempesta`, `nginx` and others with right parameters. Default is `127.0.0.1`. + +`hostname`, `port`, `user` - this options describes "management" interface of +the host. Testing framework uses this fields to connect to each test node. + +Options above are common for hosts `Server` and `Tempesta` and present in each +section. + +`workdir` - Directory with TempestaFW sources. Must be absolute path. + +#### Server Section + +Options listed in [Tempesta Section](#tempesta-section): `ip`, `hostname`, +`port`, `user` are also applied to this section. + +`workdir` - directory to place temporary files (configs, pidfiles, etc.) on the +host. R/W access is required, must be absolute path. + +`nginx` - absolute path to corresponding binary. + +`resourses` - absolute path to directory with sample web pages. Must be +reachable by `nginx`. + + +### Run tests + +To run all the tests simply run: +```sh +$ ./run_tests.py +``` + +The unittest module can be used from the command line to run tests from modules, +classes or even individual test methods: +```sh +$ python2 -m unittest test_module1 test_module2 +$ python2 -m unittest test_module.TestClass +$ python2 -m unittest test_module.TestClass.test_method +``` +Next command will run all tests from specified directory: +```sh +$ python2 -m unittest discover +``` +In this case verbosity of the tests names is controlled separately from +configuration flie: +```sh +$ python2 -m unittest -v test_module.TestClass +``` +For a list of all the command-line options: +```sh +$ python2 -m unittest -h +``` + + +## Adding new tests + +Adding new tests is easy. First, create new Python file in the new Python module +(directory) or existing one. +Name of the file must be started with `test_` +```sh +$ mkdir my_test +$ touch my_test/test_some_feature.py +$ echo "__all__ = [ 'test_some_feature' ]" >> my_test/__init.py__ +``` + +Import module `unittest`, and derive you test class from `stress.StressTest` +or `functional.FunctionalTest` +class from `testers` module. Add functions started with `test_`. Test class may +have any name. Here is example of `my_test/test_some_feature.py`: +```python +import unittest +from testers import stress + +class MyTester(stress.StressTest): + """ Test class documentation. """ + + tempesta_defconfig = 'cache 0;\n' + + def test_some_featue(self): + """ Test documentation. """ + self.generic_test_routine(self.tempesta_defconfig) +``` + +Tests can be skipped or marked as expected to fail. +More info at [Python documentation](https://docs.python.org/3/library/unittest.html). + diff --git a/tempesta_fw/t/functional/cache/__init__.py b/tempesta_fw/t/functional/cache/__init__.py new file mode 100644 index 0000000000..830e513649 --- /dev/null +++ b/tempesta_fw/t/functional/cache/__init__.py @@ -0,0 +1 @@ +__all__ = ['check_cache'] diff --git a/tempesta_fw/t/functional/cache/test_cache.py b/tempesta_fw/t/functional/cache/test_cache.py new file mode 100644 index 0000000000..c165f1d805 --- /dev/null +++ b/tempesta_fw/t/functional/cache/test_cache.py @@ -0,0 +1,100 @@ +"""Functional tests of caching responses.""" + +from __future__ import print_function +import unittest +from helpers import deproxy, tf_cfg, tempesta +from testers import functional + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + +# TODO: add tests for RFC compliance + +class TestCacheDisabled(functional.FunctionalTest): + + messages = 10 + + # Disable caching + cache_mode = 0 + + def chain(self, uri='/', cache_alowed=True): + if self.cache_mode == 0: + cache_alowed = False + if cache_alowed: + return cache_chains(self.messages, uri=uri) + return proxy_chains(self.messages, uri=uri) + + def test_cache_fulfill_all(self): + config = ('cache %d;\n' + 'cache_fulfill * *;\n' % self.cache_mode) + self.generic_test_routine(config, self.chain(cache_alowed=True)) + + def test_cache_bypass_all(self): + config = ('cache %d;\n' + 'cache_bypass * *;\n' % self.cache_mode) + self.generic_test_routine(config, self.chain(cache_alowed=False)) + + def mixed_config(self): + return ('cache %d;\n' + 'cache_fulfill suffix ".jpg" ".png";\n' + 'cache_bypass suffix ".avi";\n' + 'cache_bypass prefix "/static/dynamic_zone/";\n' + 'cache_fulfill prefix "/static/";\n' + % self.cache_mode) + + def test_cache_fulfill_suffix(self): + self.generic_test_routine( + self.mixed_config(), + self.chain(cache_alowed=True, uri='/picts/bear.jpg')) + + def test_cache_fulfill_suffix_2(self): + self.generic_test_routine( + self.mixed_config(), + self.chain(cache_alowed=True, uri='/jsnfsjk/jnd.png')) + + def test_cache_bypass_suffix(self): + self.generic_test_routine( + self.mixed_config(), + self.chain(cache_alowed=False, uri='/howto/film.avi')) + + def test_cache_bypass_prefix(self): + self.generic_test_routine( + self.mixed_config(), + self.chain(cache_alowed=False, + uri='/static/dynamic_zone/content.html')) + + def test_cache_fulfill_prefix(self): + self.generic_test_routine( + self.mixed_config(), + self.chain(cache_alowed=True, uri='/static/content.html')) + + +class TestCacheSharding(TestCacheDisabled): + + # Sharding mode. + cache_mode = 1 + +class TestCacheReplicated(TestCacheDisabled): + + # Replicated mode. + cache_mode = 2 + + +def cache_chain(uri): + cached_chain = functional.base_message_chain(uri=uri) + cached_chain.no_forward() + return cached_chain + +def proxy_chain(uri): + return functional.base_message_chain(uri=uri) + +def cache_chains(count, uri='/'): + chains = [proxy_chain(uri)] + chain = cache_chain(uri) + cached_chains = [chain for i in range (1, count)] + return chains + cached_chains + +def proxy_chains(count, uri='/'): + chain = proxy_chain(uri) + return [chain for i in range (count)] diff --git a/tempesta_fw/t/functional/helpers/__init__.py b/tempesta_fw/t/functional/helpers/__init__.py index f758b7ceb6..76c4c45172 100644 --- a/tempesta_fw/t/functional/helpers/__init__.py +++ b/tempesta_fw/t/functional/helpers/__init__.py @@ -1 +1 @@ -__all__ = [ 'be', 'cli', 'tfw' ] +__all__ = ['tf_cfg', 'deproxy', 'nginx', 'tempesta', 'siege', 'error'] diff --git a/tempesta_fw/t/functional/helpers/be.py b/tempesta_fw/t/functional/helpers/be.py deleted file mode 100644 index a8df0f1647..0000000000 --- a/tempesta_fw/t/functional/helpers/be.py +++ /dev/null @@ -1,122 +0,0 @@ -""" -A primitive back-end HTTP server implementation suitable for testing purposes. -""" - -from http.server import * -from threading import * - -__author__ = 'NatSys Lab' -__copyright__ = 'Copyright (C) 2014 NatSys Lab. (info@natsys-lab.com).' -__license__ = 'GPL2' - -def start(*args, **kwargs): - """A shortcut for BackendHTTPServer() constructor.""" - return BackendHTTPServer(*args, **kwargs) - -def _dummy_callback(method, uri, headers, body): - """An example of a backend_callback passed to BackendHTTPServer().""" - ret_code = 200 - ret_headers = { 'Content-Type': 'text/html; charset=utf-8' } - ret_body = 'Hello from dummy back-end callback' - return (ret_code, ret_headers, ret_body) - -class BackendHTTPServer(Thread, HTTPServer): - """ - Basically, this implementation does two things: - 1. It runs in a HTTP server in a separate thread. - 2. It handles all HTTP requests with a single backend_callback function - passed to the constructor. - - Also, right after initialization it blocks until a first TCP connection is - accepted. That is done to wait until Tempesta FW is connected. - So you have to start Tempesta first, and only then spawn the HTTP server. - """ - def __init__(self, backend_callback=_dummy_callback, port=8080, - wait_tfw_timeout_sec=20): - # Initialize HTTP server, bind/listen/etc. - self.accept_event = Event() - self.backend_callback = backend_callback - HTTPServer.__init__(self, ('127.0.0.1', port), BackendHTTPRequestHandler) - - # Start a background thread that accept()s connections. - kwargs = dict(poll_interval = 0.05) - Thread.__init__(self, target=self.serve_forever, kwargs=kwargs) - self.start() - - # Synchronize with Tempesta FW. - if (wait_tfw_timeout_sec): - self.wait_for_tfw(wait_tfw_timeout_sec) - - def wait_for_tfw(self, timeout): - """ - Sleep until a first accepted connection (presumably from Tempesta FW). - - FIXME: race conditions possible: after the connection is established, - the Tempesta FW must add the server to a load-balancing scheduler and - so on, so there is a time span, when the Tempesta is not yet ready to - forward incoming requests to the connected back-end server. At this - point we just hope that this time span is negligible. - BTW, that may be fixed by exporting state of Temepsta FW via debugfs. - """ - got_connection = self.accept_event.wait(timeout) - if (not got_connection): - self.shutdown() - msg = ("No connection from Tempesta FW (backend: {0}, timeout: {1})" - .format(self.server_address, timeout)) - raise Exception(msg) - - # get_request() calls accept() that blocks until the first connection. - # We just inject a synchronization with wait_for_tfw() there. - def get_request(self): - ret = super().get_request() - self.accept_event.set() - return ret - -class BackendHTTPRequestHandler(BaseHTTPRequestHandler): - """ - A wrapper for BackendHTTPServer.backend_callback. - The class simply pushes HTTP requests to the callback, and then builds - responses from data returned by the callback. - - That is done for simplicity. It is easier to code a single callback function - than a whole handler class. We have to code one in every test, and we don't - need much power in tests code, so we prefer a function over a class. - """ - def _handle_req_with_cb(self): - """ - Pass HTTP request to backend_callback, send a response containing data - returned by the callback. - """ - # Read body and push it to the callback - headers = self.headers - body_len = int(headers['Content-Length'] or 0) - body = self.rfile.read(body_len) - cb = self.server.backend_callback - resp_tuple = cb(self.command, self.path, headers, body) - - # The callback must return a tuple of exactly 3 elements: - # (http_code, headers_dict, body_str) - assert len(resp_tuple) == 3 - - # Send response fields provided by the callback. - code, headers, body = resp_tuple - body = bytes(body, 'UTF-8') - self.send_response(code) - for name, val in headers.items(): - self.send_header(name, val) - self.end_headers() - self.wfile.write(body) - print(body) - - # At this point Tempesta FW parser blocks HTTP/1.0 requests - protocol_version = 'HTTP/1.1' - - # Actual handler methods. We dispatch all them into our single function. - do_GET = _handle_req_with_cb - do_POST = _handle_req_with_cb - # Add do_METHOD here if you need anything beyond GET and POST methods. - - # By default, the base class prints a message for every incoming request. - # We don't want to see this flood in test results, so here is the stub. - def log_message(self, format, *args): - return diff --git a/tempesta_fw/t/functional/helpers/cli.py b/tempesta_fw/t/functional/helpers/cli.py deleted file mode 100644 index c7e164aee1..0000000000 --- a/tempesta_fw/t/functional/helpers/cli.py +++ /dev/null @@ -1,20 +0,0 @@ -""" -HTTP client emulator. - -These tests are built around a network of three participants: client, server and -a running Tempesta FW instance. This module is responsible for the client part. -It allows to connect to the Tempesta and send some data to it in various ways. -""" - -from socket import * -from contextlib import contextmanager - -__author__ = 'NatSys Lab' -__copyright__ = 'Copyright (C) 2014 NatSys Lab. (info@natsys-lab.com).' -__license__ = 'GPL2' - -@contextmanager -def connect_to_tfw(port=80, timeout_sec=5): - socket = create_connection(('127.0.0.1', port), timeout_sec) - yield socket - socket.close() diff --git a/tempesta_fw/t/functional/helpers/control.py b/tempesta_fw/t/functional/helpers/control.py new file mode 100644 index 0000000000..d7f2d2deea --- /dev/null +++ b/tempesta_fw/t/functional/helpers/control.py @@ -0,0 +1,370 @@ +""" Controlls node over SSH if remote, or via OS if local one. """ + +from __future__ import print_function +import abc +import re +import multiprocessing.dummy as multiprocessing +from . import tf_cfg, remote, error, nginx, tempesta, siege + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + +#------------------------------------------------------------------------------- +# Clients +#------------------------------------------------------------------------------- + +class Client(object): + __metaclass__ = abc.ABCMeta + """ Base class for managing HTTP benchmark utilities. + + Command-line options can be added by appending `Client.options` list. + Also see comment in `Client.add_option_file()` function. + """ + + def __init__(self, binary, uri=''): + """ `uri` must be relative to server root. + + DO NOT format command line options in constructor! Instead format them + in `form_command()` function. This would allow to update options until + client will be started. See `Wrk` class for example + """ + self.node = remote.client + self.connections = int(tf_cfg.cfg.get('General', 'concurrent_connections')) + self.duration = int(tf_cfg.cfg.get('General', 'Duration')) + self.workdir = tf_cfg.cfg.get('Client', 'workdir') + self.set_uri(uri) + self.bin = tf_cfg.cfg.get_binary('Client', binary) + self.cmd = '' + self.clear_stats() + # List of command-line options. + self.options = [] + # List tuples (filename, content) to create corresponding files on + # remote node. + self.files = [] + # List of files to be removed from remote node after client finish. + self.cleanup_files = [] + + def set_uri(self, uri): + """ For some clients uri is an optional parameter, e.g. for Siege. + They use file with list of uris instead. Don't force clients to use + iri field. + """ + if uri: + server_addr = tf_cfg.cfg.get('Tempesta', 'ip') + self.uri = ''.join(['http://', server_addr, uri]) + else: + self.uri = '' + + def clear_stats(self): + self.requests = 0 + self.errors = 0 + + def cleanup(self): + for f in self.cleanup_files: + self.node.remove_file(f) + + def copy_files(self): + for (name, content) in self.files: + self.node.copy_file(name, content) + + @abc.abstractmethod + def parse_out(self, stdout, stderr): + """ Parse framework results. """ + print(stdout.decode('ascii'), stderr.decode('ascii')) + return True + + def form_command(self): + """ Prepare run command for benchmark to run on remote node. """ + cmd = ' '.join([self.bin] + self.options + [self.uri]) + return cmd + + def prepare(self): + self.cmd = self.form_command() + self.clear_stats() + self.copy_files() + return True + + def results(self): + return self.requests, self.errors + + def add_option_file(self, option, filename, content): + """ Helper for using files as client options: normaly file must be + copied to remote node, present in command line as parameter and + removed after client finish. + """ + full_name = ''.join([self.workdir, filename]) + self.files.append((filename, content)) + self.options.append('%s %s' % (option, full_name)) + self.cleanup_files.append(full_name) + + def set_user_agent(self, ua): + self.options.append('-H \'User-Agent: %s\'' % ua) + + +class Wrk(Client): + """ wrk - HTTP benchmark utility. """ + + def __init__(self, threads=-1, uri='/'): + Client.__init__(self, 'wrk', uri) + self.threads = threads + + def form_command(self): + self.options.append('-d %d' % self.duration) + # At this moment threads equals user defined value or maximum theads + # count for remote node. + if self.threads == -1: + self.threads = remote.get_max_thread_count(self.node) + threads = self.threads if self.connections > 1 else 1 + self.options.append('-t %d' % threads) + self.options.append('-c %d' % self.connections) + return Client.form_command(self) + + def parse_out(self, stdout, stderr): + m = re.search(r'(\d+) requests in ', stdout) + if m: + self.requests = int(m.group(1)) + m = re.search(r'Non-2xx or 3xx responses: (\d+)', stdout) + if m: + self.errors = int(m.group(1)) + return True + + +class Ab(Client): + """ Apache benchmark. """ + + def __init__(self, uri='/'): + Client.__init__(self, 'ab', uri=uri) + + def form_command(self): + # Don't show progress. + self.options.append('-q') + self.options.append('-t %d' % self.duration) + self.options.append('-c %d' % self.connections) + return Client.form_command(self) + + def parse_out(self, stdout, stderr): + m = re.search(r'Complete requests:\s+(\d+)', stdout) + if m: + self.requests = int(m.group(1)) + m = re.search(r'Non-2xx responses:\s+(\d+)', stdout) + if m: + self.errors = int(m.group(1)) + m = re.search(r'Failed requests:\s+(\d+)', stdout) + if m: + self.errors += int(m.group(1)) + return True + + +class Siege(Client): + """ HTTP regression test and benchmark utility. """ + + def __init__(self, uri='/'): + Client.__init__(self, 'siege', uri=uri) + self.rc = siege.Config() + self.copy_rc = True + + def form_command(self): + # Benchmark: no delays between requests. + self.options.append('-b') + self.options.append('-t %dS' % self.duration) + self.options.append('-c %d' % self.connections) + # Add RC file. + if self.copy_rc: + self.add_option_file('-R', self.rc.filename, self.rc.get_config()) + else: + self.options.append('-R %s%s' % (self.workdir, self.rc.filename)) + return Client.form_command(self) + + def parse_out(self, stdout, stderr): + """ Siege prints results to stderr. """ + m = re.search(r'Successful transactions:\s+(\d+)', stderr) + if m: + self.requests = int(m.group(1)) + m = re.search(r'Failed transactions:\s+(\d+)', stderr) + if m: + self.errors = int(m.group(1)) + return True + + def set_user_agent(self, ua): + self.options.append('-A \'%s\'' % ua) + +#------------------------------------------------------------------------------- +# Client helpers +#------------------------------------------------------------------------------- + +def client_run_blocking(client): + tf_cfg.dbg(3, '\tRunning HTTP client on %s' % remote.client.host) + error.assertTrue(client.prepare()) + stdout, stderr = remote.client.run_cmd(client.cmd) + error.assertTrue(client.parse_out(stdout, stderr)) + client.cleanup() + +def __clients_prepare(client): + return client.prepare() + +def __clients_run(client): + return remote.client.run_cmd(client.cmd, timeout=(client.duration + 5)) + +def __clients_parse_output(args): + client, (stdout, stderr) = args + return client.parse_out(stdout, stderr) + +def __clients_cleanup(client): + return client.cleanup() + +def clients_run_parallel(clients): + tf_cfg.dbg(3, ('\tRunning %d HTTP clients on %s' % + (len(clients), remote.client.host))) + if not len(clients): + return True + # In most cases all Siege instances use the same config file. no need to + # copy in many times. + if isinstance(clients[0], Siege): + for i in range(1, len(clients)): + clients[i].copy_rc = False + + pool = multiprocessing.Pool(len(clients)) + results = pool.map(__clients_prepare, clients) + error.assertTrue(all(results), + 'Some HTTP clients failed on prepare stage!') + + results = pool.map(__clients_run, clients) + + parse_args = [(clients[i], results[i]) for i in range(len(clients))] + pool.map(__clients_parse_output, parse_args) + pool.map(__clients_cleanup, clients) + + +#------------------------------------------------------------------------------- +# Tempesta +#------------------------------------------------------------------------------- + + +class Tempesta(object): + + def __init__(self): + self.node = remote.tempesta + self.workdir = self.node.workdir + self.config_name = 'tempesta_fw.conf' + self.config = tempesta.Config() + self.stats = tempesta.Stats() + self.host = tf_cfg.cfg.get('Tempesta', 'hostname') + self.err_msg = ' '.join(["Can't %s TempestaFW on", self.host]) + + def start(self): + tf_cfg.dbg(3, '\tStarting TempestaFW on %s' % self.host) + self.stats.clear() + # Use relative path to work dir to get rid of extra mkdir command. + self.node.copy_file(''.join(['etc/', self.config_name]), + self.config.get_config()) + cmd = '%s/scripts/tempesta.sh --start' % self.workdir + self.node.run_cmd(cmd, err_msg=(self.err_msg % 'start')) + + def stop(self): + """ Stop and unload all TempestaFW modules. """ + tf_cfg.dbg(3, '\tStoping TempestaFW on %s' % self.host) + cmd = '%s/scripts/tempesta.sh --stop' % self.workdir + # Tempesta waits for active connections 5 secs before closing. + timeout = remote.DEFAULT_TIMEOUT + 5 + self.node.run_cmd(cmd, timeout=timeout, err_msg=(self.err_msg % 'stop')) + + def get_stats(self): + cmd = 'cat /proc/tempesta/perfstat' + stdout, _ = self.node.run_cmd(cmd, + err_msg=(self.err_msg % 'get stats of')) + self.stats.parse(stdout) + +#------------------------------------------------------------------------------- +# Server +#------------------------------------------------------------------------------- + +class Nginx(object): + + def __init__(self, listen_port, workers=1): + self.node = remote.server + self.workdir = tf_cfg.cfg.get('Server', 'workdir') + self.config = nginx.Config(self.workdir, listen_port, workers) + self.clear_stats() + # Configure number of connections used by TempestaFW. + self.conns_n = tempesta.server_conns_default() + self.err_msg = "Can't %s Nginx on %s" + self.active_conns = 0 + self.requests = 0 + + def get_name(self): + return ':'.join([self.node.host, str(self.config.port)]) + + def start(self): + tf_cfg.dbg(3, '\tStarting Nginx on %s' % self.get_name()) + self.clear_stats() + # Copy nginx config to working directory on 'server' host. + self.node.copy_file(self.config.config_name, self.config.config) + # Nginx forks on start, no background threads needed, + # but it holds stderr open after demonisation. + config_file = ''.join([self.workdir, self.config.config_name]) + cmd = ' '.join([tf_cfg.cfg.get('Server', 'nginx'), '-c', config_file]) + self.node.run_cmd(cmd, ignore_stderr=True, + err_msg=(self.err_msg % ('start', self.get_name()))) + + def stop(self): + tf_cfg.dbg(3, '\tStoping Nginx on %s' % self.get_name()) + pid_file = ''.join([self.workdir, self.config.pidfile_name]) + config_file = ''.join([self.workdir, self.config.config_name]) + cmd = '[ -f %s ] && kill -s TERM $(cat %s)' % (pid_file, pid_file) + self.node.run_cmd(cmd, ignore_stderr=True, + err_msg=(self.err_msg % ('stop', self.get_name()))) + self.node.remove_file(config_file) + + def get_stats(self): + """ Nginx doesn't have counters for every virtual host. Spawn separate + instances instead + """ + self.stats_ask_times += 1 + # In default tests configuration Nginx status available on + # `nginx_status` page. + uri = 'http://%s:%d/nginx_status' % (self.node.host, self.config.port) + cmd = 'curl %s' % uri + out, _ = remote.client.run_cmd( + cmd, err_msg=(self.err_msg % ('get stats of', self.get_name()))) + m = re.search(r'Active connections: (\d+) \n' + r'server accepts handled requests\n \d+ \d+ (\d+)', + out) + if m: + # Current request increments active connections for nginx. + self.active_conns = int(m.group(1)) - 1 + # Get rid of stats requests influence to statistics. + self.requests = int(m.group(2)) - self.stats_ask_times + + def clear_stats(self): + self.active_conns = 0 + self.requests = 0 + self.stats_ask_times = 0 + +#------------------------------------------------------------------------------- +# Server helpers +#------------------------------------------------------------------------------- + +MAX_THREADS = 32 + +def __servers_pool_size(n_servers): + if remote.server.is_remote(): + # By default MasSessions in sshd config is 10. Do not overflow it. + return 4 + else: + return min(n_servers, MAX_THREADS) + +def servers_start(servers): + threads = __servers_pool_size(len(servers)) + pool = multiprocessing.Pool(threads) + pool.map(Nginx.start, servers) + +def servers_stop(servers): + threads = __servers_pool_size(len(servers)) + pool = multiprocessing.Pool(threads) + pool.map(Nginx.stop, servers) + +def servers_get_stats(servers): + threads = __servers_pool_size(len(servers)) + pool = multiprocessing.Pool(threads) + pool.map(Nginx.get_stats, servers) diff --git a/tempesta_fw/t/functional/helpers/deproxy.py b/tempesta_fw/t/functional/helpers/deproxy.py new file mode 100644 index 0000000000..ad149a09ba --- /dev/null +++ b/tempesta_fw/t/functional/helpers/deproxy.py @@ -0,0 +1,661 @@ +from __future__ import print_function +import abc +import httplib +from StringIO import StringIO +import asyncore +import select +import socket +import sys +import time +from BaseHTTPServer import BaseHTTPRequestHandler +from . import error, tf_cfg, tempesta + + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + +#------------------------------------------------------------------------------- +# Utils +#------------------------------------------------------------------------------- + +class ParseError(Exception): + pass + +class IncompliteMessage(ParseError): + pass + + +class HeaderCollection(object): + """ + A collection class for HTTP Headers. This class combines aspects of a list + and a dict. Lookup is always case-insenitive. A key can be added multiple + times with different values, and all of those values will be kept. + """ + + def __init__(self, mapping=None, **kwargs): + self.headers = [] + if mapping is not None: + for k, v in mapping.iteritems(): + self.add(k, v) + if kwargs is not None: + for k, v in kwargs.iteritems(): + self.add(k, v) + + def __contains__(self, item): + item = item.lower() + for header in self.headers: + if header[0].lower() == item: + return True + return False + + def __len__(self): + return self.headers.__len__() + + def __getitem__(self, key): + key = key.lower() + for header in self.headers: + if header[0].lower() == key: + return header[1] + + def __setitem__(self, key, value): + lower = key.lower() + for i, header in enumerate(self.headers): + if header[0].lower() == lower: + headers[i] = (header[0], value) + return + else: + self.add(key.lower(), value) + + def __delitem__(self, key): + self.delete_all(name=key) + + def __iter__(self): + return self.iterkeys() + + def add(self, name, value): + self.headers.append((name, value,)) + + def find_all(self, name): + name = name.lower() + for header in self.headers: + if header[0].lower() == name: + yield header[1] + + def delete_all(self, name): + lower = name.lower() + self.headers = [header for header in self.headers + if header[0].lower() != lower] + + def iterkeys(self): + for header in self.headers: + yield header[0] + + def itervalues(self): + for header in self.headers: + yield header[1] + + def iteritems(self): + for header in self.headers: + yield header + + def keys(self): + return [key.lower() for key in self.iterkeys()] + + def values(self): + return [value for value in self.itervalues()] + + def items(self): + return self.headers + + def get(self, key, default=None): + if key in self: + return self[key] + return default + + @staticmethod + def from_stream(rfile, no_crlf=False): + headers = HeaderCollection() + line = rfile.readline() + while not (line == '\r\n' or line == '\n'): + if no_crlf and not line: + break + if not line or (line[-1] != '\n'): + raise IncompliteMessage('Incomplite headers') + line = line.rstrip('\r\n') + try: + name, value = line.split(':', 1) + except: + raise ParseError('Invalid header format') + name = name.strip() + value = value.strip() + line = rfile.readline() + while line.startswith(' ') or line.startswith('\t'): + # Continuation lines - see RFC 2616, section 4.2 + value += ' ' + line.strip() + line = rfile.readline() + headers.add(name, value) + return headers + + def __eq__(left, right): + h_left = set([(hed.lower(), val) for hed, val in left.items()]) + h_right = set([(hed.lower(), val) for hed, val in right.items()]) + return h_left == h_right + + def __ne__(left, right): + return not HeaderCollection.__eq__(left, right) + + def __str__(self): + return ''.join(['%s: %s\r\n' % (hed, val) for hed, val in self.items()]) + + def __repr__(self): + return repr(self.headers) + + +#------------------------------------------------------------------------------- +# HTTP Messages +#------------------------------------------------------------------------------- + +class HttpMessage(object): + __metaclass__ = abc.ABCMeta + + def __init__(self, message_text=None, body_parsing=True): + self.msg = '' + self.body_parsing = True + self.headers = HeaderCollection() + self.trailer = HeaderCollection() + self.body = '' + self.version = "HTTP/0.9" # default version. + if message_text: + self.parse_text(message_text, body_parsing) + + def parse_text(self, message_text, body_parsing=True): + self.body_parsing = body_parsing + self.msg = message_text + stream = StringIO(self.msg) + self.__parse(stream) + + def __parse(self, stream): + self.parse_firstline(stream) + self.parse_headers(stream) + self.body = '' + self.parse_body(stream) + + @abc.abstractmethod + def parse_firstline(self, stream): + pass + + def parse_headers(self, stream): + self.headers = HeaderCollection().from_stream(stream) + + def parse_body(self, stream): + if self.body_parsing and 'Transfer-Encoding' in self.headers: + chunked = False + enc = self.headers['Transfer-Encoding'] + option = enc.split(',')[-1] # take the last option + + if option.strip().lower() == 'chunked': + self.read_chunked_body(stream) + else: + error.bug('Not implemented!') + elif self.body_parsing and 'Content-Length' in self.headers: + length = int(self.headers['Content-Length']) + self.read_sized_body(stream, length) + else: + self.body = stream.read() + + def read_chunked_body(self, stream): + while True: + line = stream.readline() + self.body += line + try: + size = int(line.rstrip('\r\n')) + assert size >= 0 + chunk = stream.readline() + self.body += chunk + + assert len(chunk.rstrip('\r\n')) == size + assert chunk[-1] == '\n' + if size == 0: + break + except: + raise ParseError('Error in chuncked body') + + # Parsing trailer will eat last CRLF + self.parse_trailer(stream) + + def read_sized_body(self, stream, size): + if size == 0: + return + self.body = stream.read(size) + # Remove CRLF + line = stream.readline() + if line.rstrip('\r\n'): + raise ParseError('No CRLF after body.') + if len(self.body) != size: + raise ParseError(("Wrong body size: expect %d but got %d!" + % (size, len(self.body)))) + + def parse_trailer(self, stream): + self.trailer = HeaderCollection().from_stream(stream, no_crlf=True) + + @abc.abstractmethod + def __eq__(left, right): + return ((left.headers == right.headers) and + (left.body == right.body) and + (left.trailer == right.trailer)) + + @abc.abstractmethod + def __ne__(left, right): + return not HttpMessage.__eq__(left, right) + + def __str__(self): + return ''.join([str(self.headers), '\r\n', self.body, str(self.trailer)]) + + def update(self, firstline): + message = '\r\n'.join([firstline, str(self)]) + self.parse_text(message) + + @staticmethod + def date_time_string(timestamp=None): + """Return the current date and time formatted for a message header.""" + weekdayname = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] + monthname = [None, + 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', + 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec'] + if timestamp is None: + timestamp = time.time() + year, month, day, hh, mm, ss, wd, y, z = time.gmtime(timestamp) + s = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % ( + weekdayname[wd], day, monthname[month], year, hh, mm, ss) + return s + + @staticmethod + def create(first_line, headers, date=False, srv_version=None, body=None): + if date: + date = ''.join(['Date: ', HttpMessage.date_time_string()]) + headers.append(date) + if srv_version: + version = ''.join(['Server: ', srv_version]) + headers.append(version) + end = ['\r\n'] + if body != None: + end = ['', body] + return '\r\n'.join([first_line] + headers + end) + + +class Request(HttpMessage): + + methods = ['OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', + 'CONNECT', 'PATCH'] + + def __init__(self, *args, **kwargs): + self.method = None + self.uri = None + HttpMessage.__init__(self, *args, **kwargs) + + def parse_firstline(self, stream): + requestline = stream.readline() + if requestline[-1] != '\n': + raise IncompliteMessage('Incomplite request line!') + + words = requestline.rstrip('\r\n').split() + if len(words) == 3: + self.method, self.uri, self.version = words + elif len(words) == 2: + self.method, self.uri = words + else: + raise ParseError('Invalid request line!') + if not self.method in self.methods: + raise ParseError('Invalid request method!') + + def __eq__(left, right): + return ((left.method == right.method) + and (left.version == right.version) + and (left.uri == right.uri) + and HttpMessage.__eq__(left, right)) + + def __ne__(left, right): + return not Request.__eq__(left, right) + + def update(self): + HttpMessage.update(self, + ' '.join([self.method, self.uri, self.version])) + + @staticmethod + def create(method, headers, uri='/', version='HTTP/1.1', date=False, + body=None): + first_line = ' '.join([method, uri, version]) + msg = HttpMessage.create(first_line, headers, date=date, body=body) + return Request(msg) + + +class Response(HttpMessage): + + def __init__(self, *args, **kwargs): + self.status = None # Status-Code + self.reason = None # Reason-Phrase + HttpMessage.__init__(self, *args, **kwargs) + + def parse_firstline(self, stream): + statusline = stream.readline() + if statusline[-1] != '\n': + raise IncompliteMessage('Incomplite Status line!') + + words = statusline.rstrip('\r\n').split() + if len(words) >= 3: + self.version, self.status = words[0:2] + self.reason = words[2:] + elif len(words) == 2: + self.version, self.status = words + else: + raise ParseError('Invalid Status line!') + try: + status = int(self.status) + assert status > 100 and status < 600 + except: + raise ParseError('Invalid Status code!') + + def __eq__(left, right): + return ((left.status == right.status) + and (left.version == right.version) + and (left.reason == right.reason) + and HttpMessage.__eq__(left, right)) + + def __ne__(left, right): + return not Response.__eq__(left, right) + + def update(self): + status = int(self.status) + reason = reason = BaseHTTPRequestHandler.responses[status][0] + HttpMessage.update(self, + ' '.join([self.version, self.status, reason])) + + @staticmethod + def create(status, headers, version='HTTP/1.1', date=False, + srv_version=None, body=None): + reason = BaseHTTPRequestHandler.responses + first_line = ' '.join([version, str(status), reason[status][0]]) + msg = HttpMessage.create(first_line, headers, date=date, + srv_version=srv_version, body=body) + return Response(msg) + +#------------------------------------------------------------------------------- +# HTTP Client/Server +#------------------------------------------------------------------------------- +MAX_MESSAGE_SIZE = 65536 + +class Client(asyncore.dispatcher): + + def __init__(self, host=None, port=80): + asyncore.dispatcher.__init__(self) + self.request_buffer = '' + self.response_buffer = '' + self.tester = None + if host is None: + host = 'Tempesta' + addr = tf_cfg.cfg.get(host, 'ip') + tf_cfg.dbg(4, '\tDeproxy: Client: Conect to %s:%d.' % (addr, port)) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.connect((addr, port)) + + def clear(self): + self.request_buffer = '' + self.response_buffer = '' + + def set_request(self, request): + if request: + self.request_buffer = request.msg + + def set_tester(self, tester): + self.tester = tester + + def handle_connect(self): + pass + + def handle_close(self): + self.close() + + def handle_read(self): + self.response_buffer += self.recv(MAX_MESSAGE_SIZE) + if not self.response_buffer: + return + tf_cfg.dbg(4, '\tDeproxy: Client: Recieve response from server.') + tf_cfg.dbg(5, self.response_buffer) + try: + response = Response(self.response_buffer) + except IncompliteMessage: + return + except ParseError: + tf_cfg.dbg(4, ('Deproxy: Client: Can\'t parse message\n' + '<<<<<\n%s>>>>>' + % self.response_buffer)) + raise + if self.tester: + self.tester.recieved_response(response) + self.response_buffer = '' + + def writable(self): + if not self.tester: + return False + return self.tester.is_srvs_ready() and (len(self.request_buffer) > 0) + + def handle_write(self): + tf_cfg.dbg(4, '\tDeproxy: Client: Send request to server.') + tf_cfg.dbg(5, self.request_buffer) + sent = self.send(self.request_buffer) + self.request_buffer = self.request_buffer[sent:] + + def handle_error(self): + t, v, tb = sys.exc_info() + error.bug('\tDeproxy: Client: %s' % v) + + +class ServerConnection(asyncore.dispatcher_with_send): + + def __init__(self, tester, server, sock=None, keep_alive=None): + asyncore.dispatcher_with_send.__init__(self, sock) + self.tester = tester + self.server = server + self.keep_alive = keep_alive + self.responses_done = 0 + self.request_buffer = '' + self.tester.register_srv_connection(self) + tf_cfg.dbg(4, '\tDeproxy: SrvConnection: New server connection.') + + def handle_read(self): + self.request_buffer += self.recv(MAX_MESSAGE_SIZE) + try: + request = Request(self.request_buffer) + except IncompliteMessage: + return + except ParseError: + tf_cfg.dbg(4, ('Deproxy: SrvConnection: Can\'t parse message\n' + '<<<<<\n%s>>>>>' + % self.request_buffer)) + # Hande will be called even if buffer is empty. + if not self.request_buffer: + return + tf_cfg.dbg(4, '\tDeproxy: SrvConnection: Recieve request from Tempesta.') + tf_cfg.dbg(5, self.request_buffer) + if not self.tester: + return + response = self.tester.recieved_forwarded_request(request, self) + self.request_buffer = '' + if not response: + return + if response.msg: + tf_cfg.dbg(4, '\tDeproxy: SrvConnection: Send response to Tempesta.') + tf_cfg.dbg(5, response.msg) + self.send(response.msg) + else: + tf_cfg.dbg(4, '\tDeproxy: SrvConnection: Try send invalid response.') + if self.keep_alive: + self.responses_done += 1 + if self.responses_done == self.keep_alive: + self.handle_close() + + def handle_error(self): + t, v, tb = sys.exc_info() + error.bug('\tDeproxy: SrvConnection: %s' % v) + + def handle_close(self): + if self.tester: + self.tester.remove_srv_connection(self) + asyncore.dispatcher_with_send.handle_close(self) + + def close(self): + tf_cfg.dbg(4, '\tDeproxy: SrvConnection: Close connection.') + asyncore.dispatcher_with_send.close(self) + + +class Server(asyncore.dispatcher): + + def __init__(self, port, host=None, connections=None, keep_alive=None): + asyncore.dispatcher.__init__(self) + self.tester = None + self.port = port + if connections is None: + connections = tempesta.server_conns_default() + self.conns_n = connections + self.keep_alive = keep_alive + if host is None: + host = 'Client' + addr = tf_cfg.cfg.get('Client', 'ip') + tf_cfg.dbg(4, '\tDeproxy: Server: Start on %s:%d.' % (addr, port)) + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.set_reuse_addr() + self.bind((addr, port)) + self.listen(socket.SOMAXCONN) + + def set_tester(self, tester): + self.tester = tester + + def handle_accept(self): + pair = self.accept() + if pair is not None: + sock, addr = pair + handler = ServerConnection(self.tester, server=self, sock=sock, + keep_alive=self.keep_alive) + + def handle_error(self): + t, v, tb = sys.exc_info() + error.bug('\tDeproxy: Server: %s' % v) + + +#------------------------------------------------------------------------------- +# Message Chain +#------------------------------------------------------------------------------- +TEST_CHAIN_TIMEOUT = 1.0 + +class MessageChain(object): + + def __init__(self, request, expected_response, forwarded_request=None, + server_response=None): + # Request to be sent from Client. + self.request = request + # Response recieved on client. + self.response = expected_response + # Expexted request forwarded to server by Tempesta to server. + self.fwd_request = forwarded_request if forwarded_request else Request() + # Server response in reply to forwarded request. + self.server_response = ( + server_response if expected_response else Response()) + + def no_forward(self): + # Expexted request forwarded to server by Tempesta to server. + self.fwd_request = Request() + # Server response in reply to forwarded request. + self.server_response = Response() + + @staticmethod + def empty(): + return MessageChain(Request(), Response()) + + +class Deproxy(object): + + def __init__(self, message_chains, client, servers, register=True): + self.message_chains = message_chains + self.client = client + self.servers = servers + # Current chain of expected messages. + self.current_chain = None + # Current chain of recieved messages. + self.recieved_chain = None + # Timeout to wait for test completion. + self.timeout = 1 + # Registered connections. + self.srv_connections = [] + if register: + self.register_tester() + + def register_tester(self): + self.client.set_tester(self) + for server in self.servers: + server.set_tester(self) + + def loop(self, timeout=TEST_CHAIN_TIMEOUT): + """Poll for socket events no more than `timeout` seconds.""" + try: + eta = time.time() + timeout + map = asyncore.socket_map + + if hasattr(select, 'poll'): + poll_fun = asyncore.poll2 + else: + poll_fun = asyncore.poll + + while (eta > time.time()) and map: + poll_fun(min(self.timeout, timeout), map) + except asyncore.ExitNow: + pass + + def run(self): + for self.current_chain in self.message_chains: + self.recieved_chain = MessageChain.empty() + self.client.clear() + self.client.set_request(self.current_chain.request) + self.loop() + self.check_expectations() + + def check_expectations(self): + for message in ['response', 'fwd_request']: + expected = getattr(self.current_chain, message) + recieved = getattr(self.recieved_chain, message) + assert expected == recieved, \ + ("Recieved message (%s) does not suit expected one!\n\n" + "\tRecieved:\n<<<<<|\n%s|>>>>>\n" + "\tExpected:\n<<<<<|\n%s|>>>>>\n" + % (message, recieved.msg, expected.msg)) + + def recieved_response(self, response): + """Client recieved response for its request.""" + self.recieved_chain.response = response + raise asyncore.ExitNow + + def recieved_forwarded_request(self, request, connection): + self.recieved_chain.fwd_request = request + return self.current_chain.server_response + + def register_srv_connection(self, connection): + self.srv_connections.append(connection) + + def remove_srv_connection(self, connection): + # Normaly we have the connection in the list, but do not crash test + # framework if that is not true. + try: + self.srv_connections.remove(connection) + except: + pass + + def is_srvs_ready(self): + expected_conns_n = sum([s.conns_n for s in self.servers]) + return expected_conns_n == len(self.srv_connections) + + def close_all(self): + self.client.close() + for conn in self.srv_connections: + conn.close() + for server in self.servers: + server.close() diff --git a/tempesta_fw/t/functional/helpers/error.py b/tempesta_fw/t/functional/helpers/error.py new file mode 100644 index 0000000000..a7d10b3bb4 --- /dev/null +++ b/tempesta_fw/t/functional/helpers/error.py @@ -0,0 +1,28 @@ +from __future__ import print_function + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + +class Error(Exception): + """Base exception class for unrecoverable framework errors. + + Python unittest treats AssertionError as test failure rather than the error. + Separate exception class is needed to indicate that error happen and + test framework is not working as expected. + """ + pass + +def assertFalse(expression, msg=''): + """Raise test framework error if 'expression' is true.""" + if expression: + raise Error(msg) + +def assertTrue(expression, msg=''): + """Raise test framework error if 'expression' is false.""" + if not expression: + raise Error(msg) + +def bug(msg=''): + """Raise test framework error.""" + raise Error(msg) diff --git a/tempesta_fw/t/functional/helpers/nginx.py b/tempesta_fw/t/functional/helpers/nginx.py new file mode 100644 index 0000000000..741b9f1ed7 --- /dev/null +++ b/tempesta_fw/t/functional/helpers/nginx.py @@ -0,0 +1,92 @@ +""" Nginx helpers. """ + +from __future__ import print_function +import re +from . import tf_cfg, error + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + +class Config(object): + """ Nginx config file builder. """ + + def __init__(self, workdir, port, workers): + self.port = 80 # keep port linked with default config + self.config = """ +pid /var/run/nginx.pid; +worker_processes auto; + +events { + worker_connections 1024; + use epoll; +} + +http { + keepalive_timeout 65; + keepalive_requests 100; + sendfile on; + tcp_nopush on; + tcp_nodelay on; + + open_file_cache max=1000; + open_file_cache_valid 30s; + open_file_cache_min_uses 2; + open_file_cache_errors off; + + # [ debug | info | notice | warn | error | crit | alert | emerg ] + # Fully disable log errors. + error_log /dev/null emerg; + + # Disable access log altogether. + access_log off; + + server { + listen 80; + + location / { + root /srv/http; + } + location /nginx_status { + stub_status on; + } + } +} + """ + self.set_port(port) + self.set_workdir(workdir) + self.set_workers(workers) + self.set_resourse_location() + + def __replace(self, exp, value): + regex = re.compile(exp) + self.config = regex.sub(value, self.config) + + def set_ka(self, req, timeout=65): + """ Set Keepalive parameters for server. """ + self.__replace(r'keepalive_timeout[ ]+(\d+);', + ' '.join(['keepalive_timeout', str(timeout), ';'])) + self.__replace(r'keepalive_requests[ ]+(\d+);', + ' '.join(['keepalive_requests', str(req), ';'])) + + def set_workers(self, workers='auto'): + self.__replace(r'worker_processes[ ]+(\w+);', + ' '.join(['worker_processes', str(workers), ';'])) + + def set_port(self, port): + self.port = int(port) + self.config_name = 'nginx_%d.conf' % port + self.pidfile_name = 'nginx_%d.pid' % port + self.__replace(r'listen[ ]+(\w+);', + ' '.join(['listen', str(port), ';'])) + + def set_workdir(self, workdir): + error.assertTrue(workdir) + self.__replace(r'pid[ ]+([\w._/]+);', + ''.join(['pid ', workdir, self.pidfile_name, ' ;'])) + + def set_resourse_location(self, location=''): + if not location: + location = tf_cfg.cfg.get('Server', 'resources') + self.__replace(r'root[ ]+([\w._/]+);', + ' '.join(['root', location, ';'])) diff --git a/tempesta_fw/t/functional/helpers/remote.py b/tempesta_fw/t/functional/helpers/remote.py new file mode 100644 index 0000000000..d10b0df1ad --- /dev/null +++ b/tempesta_fw/t/functional/helpers/remote.py @@ -0,0 +1,200 @@ +""" Controlls node over SSH if remote, or via OS if local one. """ + +from __future__ import print_function +import re +import os +import abc +import paramiko +import subprocess32 as subprocess +from . import tf_cfg, error + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + +# Don't remove files from remote node. Helpful for tests development. +DEBUG_FILES = False +# Defult timeout for SSH sessions and command processing. +DEFAULT_TIMEOUT = 5 + +class Node(object): + __metaclass__ = abc.ABCMeta + + def __init__(self, hostname, workdir): + self.host = hostname + self.workdir = workdir + + def is_remote(self): + return self.host != 'localhost' + + @abc.abstractmethod + def run_cmd(self, cmd, timeout=DEFAULT_TIMEOUT, ignore_stderr=False, + err_msg=''): pass + + @abc.abstractmethod + def mkdir(self, path): pass + + @abc.abstractmethod + def copy_file(self, filename, content, path=None): pass + + @abc.abstractmethod + def remove_file(self, filename): pass + + +class LocalNode(Node): + def __init__(self, hostname, workdir): + Node.__init__(self, hostname, workdir) + + def run_cmd(self, cmd, timeout=DEFAULT_TIMEOUT, ignore_stderr=False, + err_msg=''): + tf_cfg.dbg(4, "\tRun command '%s' on host %s" % (cmd, self.host)) + stdout = '' + stderr = '' + stderr_pipe = (open(os.devnull, 'w') if ignore_stderr + else subprocess.PIPE) + with subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, + stderr=stderr_pipe) as p: + try: + stdout, stderr = p.communicate(timeout) + assert p.returncode == 0, "Return code is not 0." + except Exception as e: + if not err_msg: + err_msg = ("Error running command '%s' on %s: %s" % + (cmd, self.host, e)) + error.bug(err_msg) + return stdout, stderr + + def mkdir(self, path): + try: + os.makedirs(path) + except OSError: + if not os.path.isdir(path): + raise + + def copy_file(self, filename, content, path=None): + # Create dir first. + if path is None: + path = self.workdir + else: + self.mkdir(path) + filename = ''.join([path, filename]) + with open(filename, 'w') as f: + f.write(content) + + + def remove_file(self, filename): + if DEBUG_FILES: + return + if os.path.isfile(filename): + os.remove(filename) + + +class RemoteNode(Node): + def __init__(self, hostname, workdir, user, port=22): + Node.__init__(self, hostname, workdir) + self.user = user + self.port = port + self.connect() + + def connect(self): + """ Open SSH connection to node if remote. Returns False on SSH errors. + """ + try: + self.ssh = paramiko.SSHClient() + self.ssh.load_system_host_keys() + # Workaround: paramiko prefer RSA keys to ECDSA, so add RSA + # key to known_hosts. + self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + self.ssh.connect(hostname=self.host, username=self.user, + port=self.port, timeout=DEFAULT_TIMEOUT) + except Exception as e: + error.bug("Error connecting %s: %s" % (self.host, e)) + + def close(self): + """ Release SSH connection without waitning for GC. """ + self.ssh.close() + + def run_cmd(self, cmd, timeout=DEFAULT_TIMEOUT, ignore_stderr=False, + err_msg=''): + tf_cfg.dbg(4, "\tRun command '%s' on host %s" % (cmd, self.host)) + stderr = '' + stdout = '' + try: + _, out_f, err_f = self.ssh.exec_command(cmd, timeout=timeout) + stdout = out_f.read() + if not ignore_stderr: + stderr = err_f.read() + assert out_f.channel.recv_exit_status() == 0, "Return code is not 0." + except Exception as e: + if not err_msg: + err_msg = ("Error running command '%s' on %s: %s" % + (cmd, self.host, e)) + error.bug(err_msg) + return stdout, stderr + + def mkdir(self, path): + self.run_cmd('mkdir -p %s' % path) + + def copy_file(self, filename, content, path=None): + # Create directory it is not default workdir. + if path is None: + path = self.workdir + else: + self.mkdir(path) + filename = ''.join([path, filename]) + try: + sftp = self.ssh.open_sftp() + sfile = sftp.file(filename, 'w', -1) + sfile.write(content) + sfile.flush() + sftp.close() + except Exception as e: + error.bug(("Error copying file %s to %s: %s" % + (filename, self.host, e))) + + def remove_file(self, filename): + if DEBUG_FILES: + return + try: + sftp = self.ssh.open_sftp() + sftp.unlink(filename) + sftp.close() + except Exception as e: + error.bug(("Error removing file %s on %s: %s" % + (filename, self.host, e))) + + + +def create_node(host): + hostname = tf_cfg.cfg.get(host, 'hostname') + workdir = tf_cfg.cfg.get(host, 'workdir') + + if hostname != 'localhost': + port = int(tf_cfg.cfg.get(host, 'port')) + username = tf_cfg.cfg.get(host, 'user') + return RemoteNode(hostname, workdir, username, port) + return LocalNode(hostname, workdir) + + +#------------------------------------------------------------------------------- +# Helper functions. +#------------------------------------------------------------------------------- + +def get_max_thread_count(node): + out, _ = node.run_cmd('grep -c processor /proc/cpuinfo') + m = re.match(r'^(\d+)$', out) + if not m: + return 1 + return int(m.group(1).decode('ascii')) + +#------------------------------------------------------------------------------- +# Global accessable SSH/Local connections +#------------------------------------------------------------------------------- +client = create_node('Client') +tempesta = create_node('Tempesta') +server = create_node('Server') + +# Create working directories on client and server nodes. Work directory on +# Tempesta contains sources and must exist. +for node in [client, server]: + node.mkdir(node.workdir) diff --git a/tempesta_fw/t/functional/helpers/siege.py b/tempesta_fw/t/functional/helpers/siege.py new file mode 100644 index 0000000000..292e375695 --- /dev/null +++ b/tempesta_fw/t/functional/helpers/siege.py @@ -0,0 +1,56 @@ +from __future__ import print_function + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + +class Config(object): + """ Siege.rc file helper. + + Most of siege options require to update file and cannot be changed using + command-line arguments. + """ + + def __init__(self): + self.filename = 'siege.rc' + self.options = [ + # Default concurrent. + ('concurrent', '25'), + # Disable printing eash transaction to stdout. + ('verbose', 'false'), + # Leave color for humas. It breaks regexes. + ('color', 'off'), + ('protocol', 'HTTP/1.1'), + ('quiet', 'false'), + ('show-logfile', 'false'), + ('logging', 'false'), + ('accept-encoding', 'gzip;deflate'), + # Cache revalidation. + ('cache', 'false'), + # Method used, when running with `-g` option. + ('gmethod', 'HEAD'), + # Enable http parser. + ('parser', 'true'), + # Cookies support. + ('cookies', 'true'), + # Number of total failures allowed before siege aborts. + ('failures', '10'), + # Keep-Alive or close connections after each request. + ('connection', 'close')] + + def set_option(self, option, value): + for i in range(len(self.options)): + opt, _ = self.options[i] + if opt == option: + if value == '': + del self.options[i] + else: + self.options[i] = (option, value) + return + if value: + self.options.append((option, value)) + + + def get_config(self): + cfg = '\n'.join(['%s = %s' % opt for opt in self.options]) + return cfg diff --git a/tempesta_fw/t/functional/helpers/tempesta.py b/tempesta_fw/t/functional/helpers/tempesta.py new file mode 100644 index 0000000000..9a3a3eac27 --- /dev/null +++ b/tempesta_fw/t/functional/helpers/tempesta.py @@ -0,0 +1,178 @@ +import re +import os +from . import error + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + +# Tempesta capabilities: +def servers_in_group(): + """ Max servers in server group. """ + return 32 + +def server_conns_default(): + """ Default connections to single upstream server. """ + return 32 + +def server_conns_max(): + """ Maximum connections to single upstream server. """ + return 32 + +def upstream_port_start_from(): + """ Start value for upstream servers listen port. Just for convinence. """ + return 8000 + +# Vesion_info_cache +tfw_version = '' + +def version(): + """TempestaFW current version. Defined in tempesta_fw.h: + #define TFW_VERSION "0.5.0-pre6" + """ + global tfw_version + if tfw_version: + return tfw_version + version_header = ''.join([os.path.dirname(os.path.realpath(__file__)), + '/../../../tempesta_fw.h']) + with open(version_header, 'r') as header: + read_data = header.read() + m = re.search(r'#define TFW_VERSION\s+"([0-9a-z.-]+)"', read_data) + error.assertTrue(m) + tfw_version = m.group(1) + return tfw_version + error.bug() + + +class Stats(object): + """ Parser for TempestaFW performance statistics (/proc/tempesta/perfstat). + """ + + def __init__(self): + self.clear() + + def clear(self): + self.ss_pfl_hits = 0 + self.ss_pfl_misses = 0 + self.cache_hits = 0 + self.cache_misses = 0 + self.cl_msg_received = 0 + self.cl_msg_forwarded = 0 + self.cl_msg_served_from_cache = 0 + self.cl_msg_parsing_errors = 0 + self.cl_msg_filtered_out = 0 + self.cl_msg_other_errors = 0 + self.cl_conn_attempts = 0 + self.cl_established_connections = 0 + self.cl_conns_active = 0 + self.cl_rx_bytes = 0 + self.srv_msg_received = 0 + self.srv_msg_forwarded = 0 + self.srv_msg_parsing_errors = 0 + self.srv_msg_filtered_out = 0 + self.srv_msg_other_errors = 0 + self.srv_conn_attempts = 0 + self.srv_established_connections = 0 + self.srv_conns_active = 0 + self.srv_rx_bytes = 0 + + def parse(self, stats): + self.ss_pfl_hits = self.parse_option(stats, 'SS pfl hits') + self.ss_pfl_misses = self.parse_option(stats, 'SS pfl misses') + + self.cache_hits = self.parse_option(stats, 'Cache hits') + self.cache_misses = self.parse_option(stats, 'Cache misses') + + self.cl_msg_received = self.parse_option( + stats, 'Client messages received') + self.cl_msg_forwarded = self.parse_option( + stats, 'Client messages forwarded') + self.cl_msg_served_from_cache = self.parse_option( + stats, 'Client messages served from cache') + self.cl_msg_parsing_errors = self.parse_option( + stats, 'Client messages parsing errors') + self.cl_msg_filtered_out = self.parse_option( + stats, 'Client messages filtered out') + self.cl_msg_other_errors = self.parse_option( + stats, 'Client messages other errors') + self.cl_conn_attempts = self.parse_option( + stats, 'Client connection attempts') + self.cl_established_connections = self.parse_option( + stats, 'Client established connections') + self.cl_conns_active = self.parse_option( + stats, 'Client connections active') + self.cl_rx_bytes = self.parse_option( + stats, 'Client RX bytes') + + self.srv_msg_received = self.parse_option( + stats, 'Server messages received') + self.srv_msg_forwarded = self.parse_option( + stats, 'Server messages forwarded') + self.srv_msg_parsing_errors = self.parse_option( + stats, 'Server messages parsing errors') + self.srv_msg_filtered_out = self.parse_option( + stats, 'Server messages filtered out') + self.srv_msg_other_errors = self.parse_option( + stats, 'Server messages other errors') + self.srv_conn_attempts = self.parse_option( + stats, 'Server connection attempts') + self.srv_established_connections = self.parse_option( + stats, 'Server established connections') + self.srv_conns_active = self.parse_option( + stats, 'Server connections active') + self.srv_rx_bytes = self.parse_option( + stats, 'Server RX bytes') + + def parse_option(self, stats, name): + s = r'%s\s+: (\d+)' % name + m = re.search(s.encode('ascii'), stats) + if m: + return int(m.group(1)) + return -1 + +#------------------------------------------------------------------------------- +# Config Helpers +#------------------------------------------------------------------------------- + +class ServerGroup(object): + + def __init__(self, name='default', sched='round-robin'): + self.name = name + self.sched = sched + self.servers = [] + + def add_server(self, ip, port, conns=server_conns_default()): + error.assertTrue(conns <= server_conns_max()) + error.assertTrue(len(self.servers) < servers_in_group()) + conns_str = (' conns_n=%d' % conns if (conns != server_conns_default()) + else '') + self.servers.append('server %s:%d%s;' % (ip, port, conns_str)) + + def get_config(self): + sg = '' + if self.name == 'default': + sg = '\n'.join(['sched %s;' % self.sched] + self.servers) + else: + sg = '\n'.join( + ['srv_group %s {' % self.name] + ['sched %s;' % self.sched] + + self.servers + ['}']) + return sg + +class Config(object): + """ Creates Tempesta config file. """ + def __init__(self): + self.server_groups = [] + self.defconfig = '' + + def add_sg(self, new_sg): + for sg in self.server_groups: + error.assertTrue(sg.name != new_sg.name) + self.server_groups.append(new_sg) + + def get_config(self): + cfg = '\n'.join([sg.get_config() for sg in self.server_groups] + + [self.defconfig]) + return cfg + + def set_defconfig(self, config): + self.defconfig = config diff --git a/tempesta_fw/t/functional/helpers/tf_cfg.py b/tempesta_fw/t/functional/helpers/tf_cfg.py new file mode 100644 index 0000000000..b2623b6854 --- /dev/null +++ b/tempesta_fw/t/functional/helpers/tf_cfg.py @@ -0,0 +1,101 @@ +""" Test framework configuration options. +""" + +from __future__ import print_function, unicode_literals +import os +import sys +import configparser + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + +class TestFrameworkCfg(object): + + def __init__(self, cfg_file): + self.defaults() + self.file_err = True + if os.path.isfile(cfg_file): + self.file_err = False + self.config.read(cfg_file) + + def defaults(self): + self.config = configparser.ConfigParser() + self.config.read_dict({'General': {'verbose': '0', + 'duration': '10', + 'concurrent_connections': '10'}, + 'Client': {'ip': '127.0.0.1', + 'hostname': 'localhost', + 'ab': 'ab', + 'wrk': 'wrk', + 'siege': 'siege', + 'workdir': '/tmp/client'}, + 'Tempesta': {'ip': '127.0.0.1', + 'hostname': 'localhost', + 'user': 'root', + 'port': '22', + 'workdir': '/root/tempesta'}, + 'Server': {'ip': '127.0.0.1', + 'hostname': 'localhost', + 'user': 'root', + 'port': '22', + 'nginx': 'nginx', + 'workdir': '/tmp/nginx', + 'resources': '/var/www/html/'} + }) + + def inc_verbose(self): + verbose = int(self.config['General']['Verbose']) + 1 + self.config['General']['Verbose'] = str(verbose) + + def set_duration(self, val): + try: + int(val) + except ValueError: + return False + self.config['General']['Duration'] = val + return True + + def get(self, section, opt): + return self.config[section][opt] + + def get_binary(self, section, binary): + if self.config.has_option(section, binary): + return self.config[section][binary] + return binary + + def save_defaults(self): + self.defaults() + with open('tests_config.ini', 'w') as configfile: + self.config.write(configfile) + + def check(self): + if self.file_err: + return False, 'Configuration file "tests_config.ini" not found.' + #TODO: check configuration options + for host in ['Client', 'Tempesta', 'Server']: + if not self.config[host]['workdir'].endswith('/'): + self.config[host]['workdir'] += '/' + + if self.config['Client']['hostname'] != 'localhost': + return False, "Running clients on remote host is not supported yet." + return True, '' + +def debug(): + return int(cfg.get('General', 'Verbose')) >= 3 + +def v_level(): + return int(cfg.get('General', 'Verbose')) + +def dbg(level, *args, **kwargs): + if int(cfg.get('General', 'Verbose')) >= level: + print(*args, **kwargs) + +CFG_FILE = ''.join([os.path.dirname(os.path.realpath(__file__)), + '/../tests_config.ini']) +cfg = TestFrameworkCfg(CFG_FILE) + +r, reason = cfg.check() +if not r: + print("Error:", reason) + sys.exit(1) diff --git a/tempesta_fw/t/functional/helpers/tfw.py b/tempesta_fw/t/functional/helpers/tfw.py deleted file mode 100644 index 8beeffa9e7..0000000000 --- a/tempesta_fw/t/functional/helpers/tfw.py +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env python - -""" -Helpers for interacting with Tempesta FW (start/stop, configure, etc). -""" - -import os -import subprocess -import sys - -from . import teardown - -__author__ = 'NatSys Lab' -__copyright__ = 'Copyright (C) 2014 NatSys Lab. (info@natsys-lab.com).' -__license__ = 'GPL2' - -_functest_dir = os.path.dirname(os.path.realpath(sys.argv[0])) -_tempesta_dir = os.path.normpath(os.path.join(_functest_dir, '../../../')) - -def start(): - _sh("SYNC_SOCKET=./sync_socket TDB=./tempesta_db ./tempesta.sh start") - -def stop(): - _sh("./tempesta.sh stop") - -def _sh(command): - return subprocess.check_output(command, shell=True, cwd=_tempesta_dir) - -def _is_started(): - return (0 == subprocess.call("lsmod | grep -q tempesta", shell=True)) - -def _stop_if_started(): - if (_is_started()): - stop() - -# Ensure we start and stop in a pristine environment. -assert (not _is_started()) -# The teardown line is commented-out because we have the issue: -# #10 -Oops on shutdown -# At this point it is not solved and Tempesta FW simply can't be stopped. -# TODO: un-comment it after the issue is fixed. -#teardown.register(_stop_if_started) diff --git a/tempesta_fw/t/functional/regression/__init__.py b/tempesta_fw/t/functional/regression/__init__.py new file mode 100644 index 0000000000..6e9d069cac --- /dev/null +++ b/tempesta_fw/t/functional/regression/__init__.py @@ -0,0 +1 @@ +__all__ = ['test_shutdown', 'test_srv_failovering', 'test_stress_failovering'] diff --git a/tempesta_fw/t/functional/regression/test_shutdown.py b/tempesta_fw/t/functional/regression/test_shutdown.py new file mode 100644 index 0000000000..9897249982 --- /dev/null +++ b/tempesta_fw/t/functional/regression/test_shutdown.py @@ -0,0 +1,106 @@ +""" +Test TempestaFW shutdown when multiple connections between TempestaFW and +clients/servers are established. +""" + +from __future__ import print_function +import unittest +from helpers import deproxy, tf_cfg, tempesta, remote +from testers import functional + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + +class ShutdownTest(functional.FunctionalTest): + """Spawn a lot of clients, a lot of servers, make some requests but do not + send responses. Shutdown TempestaFW while all the connections are up. + No crushes must happen. + + TODO: Add test with half of the servers blocked by netfilter. + """ + + def setUp(self): + functional.FunctionalTest.setUp(self) + self.clients = [] + + def tearDown(self): + if self.tester: + self.tester.close_all() + if self.tempesta: + self.tempesta.stop() + + def create_client(self): + for i in range(100): + self.clients.append(deproxy.Client()) + + def create_servers(self): + self.create_servers_helper(tempesta.servers_in_group()) + + def init(self): + defconfig = 'cache 0;\n' + self.tempesta.config.set_defconfig(defconfig) + self.create_servers() + self.configure_tempesta() + self.tempesta.start() + self.create_client() + self.tester = ShutdownTester(self.clients, self.servers) + + def test_shutdown(self): + self.init() + # Run loop for small time to allow clients and servers process socket + # events. + self.tester.loop() + self.tempesta.stop() + # Run random command on remote node to see if it is still alive. + remote.tempesta.run_cmd('uname') + self.tempesta = None + + def test_shutdown_with_traffic(self): + self.init() + # Run loop for small time to allow clients and servers process socket + # events. + self.tester.run() + self.tempesta.stop() + # Run requests once more time. + self.tester.run() + # Run random command on remote node to see if it is still alive. + remote.tempesta.run_cmd('uname') + self.tempesta = None + + +class ShutdownTester(deproxy.Deproxy): + + def __init__(self, clients, servers): + deproxy.Deproxy.__init__(self, None, None, servers, register=False) + self.clients = clients + request = deproxy.Request( + "GET / HTTP/1.1\r\n" + "Host: host\r\n" + "User-Agent: curl/7.53.1\r\n" + "\r\n") + response = deproxy.Response() + self.current_chain = deproxy.MessageChain(request, response, + server_response=response) + self.register_tester() + + def register_tester(self): + for client in self.clients: + client.set_tester(self) + for server in self.servers: + server.set_tester(self) + + def run(self): + self.recieved_chain = deproxy.MessageChain.empty() + for client in self.clients: + client.clear() + client.set_request(self.current_chain.request) + self.loop() + + def close_all(self): + for client in self.clients: + client.close() + for conn in self.srv_connections: + conn.close() + for server in self.servers: + server.close() diff --git a/tempesta_fw/t/functional/regression/test_srv_failovering.py b/tempesta_fw/t/functional/regression/test_srv_failovering.py new file mode 100644 index 0000000000..3eb5061de8 --- /dev/null +++ b/tempesta_fw/t/functional/regression/test_srv_failovering.py @@ -0,0 +1,88 @@ +""" +Test Servers connections failovering. +""" + +from __future__ import print_function +import unittest +import random +import socket +import asyncore +from helpers import deproxy, tf_cfg, tempesta, remote +from testers import functional + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + +class FailoveringTest(functional.FunctionalTest): + """Spawn a lot of servers, close half on connections + + TODO: Check that TempestaFW keeps configured failovering intervals. + """ + + timeout_limit = 5.0 + + def create_servers(self): + self.create_servers_helper(tempesta.servers_in_group()) + + def create_tester(self, message_chain): + self.tester = FailoverTester(message_chain, self.client, self.servers) + + def init(self): + self.tempesta.config.set_defconfig('') + + self.create_servers() + self.configure_tempesta() + + self.tempesta.start() + self.create_client() + chains = [deproxy.MessageChain.empty()] + self.create_tester(chains) + + def test_on_close(self): + self.init() + self.tester.loop(self.timeout_limit) + self.assertTrue(self.tester.is_srvs_ready()) + + self.tester.random_close() + self.assertFalse(self.tester.is_srvs_ready()) + # Wait for connections failovering. + self.tester.loop(self.timeout_limit) + self.assertTrue(self.tester.is_srvs_ready()) + + def test_on_shutdown(self): + self.init() + self.tester.loop(self.timeout_limit) + self.assertTrue(self.tester.is_srvs_ready()) + + self.tester.random_shutdown() + self.assertFalse(self.tester.is_srvs_ready()) + # Wait for connections failovering. + self.tester.loop(self.timeout_limit) + self.assertTrue(self.tester.is_srvs_ready()) + + +class FailoverTester(deproxy.Deproxy): + + def __init__(self, *args, **kwargs): + deproxy.Deproxy.__init__(self, *args, **kwargs) + self.expected_conns_n = sum([s.conns_n for s in self.servers]) + + def register_srv_connection(self, connection): + deproxy.Deproxy.register_srv_connection(self, connection) + # Brake the loop wait if all connections are online. + if self.expected_conns_n == len(self.srv_connections): + raise asyncore.ExitNow + + def random_close(self): + for i in range (self.expected_conns_n // 4): + conn = random.choice(self.srv_connections) + if conn: + conn.handle_close() + + def random_shutdown(self): + for i in range (self.expected_conns_n // 4): + conn = random.choice(self.srv_connections) + if conn: + conn.socket.shutdown(socket.SHUT_RDWR) + conn.handle_close() diff --git a/tempesta_fw/t/functional/regression/test_stress_failovering.py b/tempesta_fw/t/functional/regression/test_stress_failovering.py new file mode 100644 index 0000000000..dff1245c98 --- /dev/null +++ b/tempesta_fw/t/functional/regression/test_stress_failovering.py @@ -0,0 +1,70 @@ +""" +Stress failovering testing: generate HTTP traffic with wrk, all requests must +be served correctly. No matter how much keep-alive request are configured on +the server. + +Refer to issue #383 for more information. +""" + +from __future__ import print_function +import unittest +import sys +from helpers import tempesta, control +from testers import stress + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + + +class RoudRobinFailovering(stress.StressTest): + """Use Round-robin scheduler (default) with different keep-alive requests + configuration on HTTP server. + + Use one server with default connections count. Since overall amount of + connections are small, failovering procedure will be loaded a lot. + We do not need a lot of connections for this test: it will just make + connections to live a little bit more under load. + """ + + def create_servers(self): + """Create sever with very little connections count. + """ + port = tempesta.upstream_port_start_from() + server = control.Nginx(listen_port=port) + server.conns_n = 4 + self.servers = [server] + + def run_test(self, ka_reqs): + """Configure server's keep-alive requests count for one session and + start generic test. + """ + for s in self.servers: + s.config.set_ka(ka_reqs) + self.generic_test_routine('cache 0;\n') + + @unittest.expectedFailure + def test_limited_ka(self): + """Small amount of keep-alive requests, make Tempesta failover + connections on a high rates. + """ + self.run_test(100) + + def test_unlimited_ka(self): + """Almost unlimited maximum amount of requests during one connection. + No connections failovering in this case. + """ + self.run_test(sys.maxsize) + + +class HashFailovering(RoudRobinFailovering): + """Absolutely the same as RoudRobinFailovering, bus uses `hash` scheduler + instead. + """ + + def configure_tempesta(self): + """Configure Tempesta to use hash scheduler instead of default one. + """ + stress.StressTest.configure_tempesta(self) + for sg in self.tempesta.config.server_groups: + sg.sched = 'hash' diff --git a/tempesta_fw/t/functional/run_all_tests.sh b/tempesta_fw/t/functional/run_all_tests.sh deleted file mode 100755 index 6e3d43f73b..0000000000 --- a/tempesta_fw/t/functional/run_all_tests.sh +++ /dev/null @@ -1,22 +0,0 @@ -#!/bin/bash -# -# 2014. Written by NatSys Lab. (info@natsys-lab.com). - -function run() { - echo run: $1 - $(dirname $0)/$1 - if [ $? -ne 0 ] - then - echo FAILED: $1 - exit -1 - fi - echo PASSED: $1 -} - -echo -echo ------------------------------------------------------------------ -echo Running functional tests... -echo ------------------------------------------------------------------ - -# Doesn't pass yet. -# run fragmented_requests.py diff --git a/tempesta_fw/t/functional/run_tests.py b/tempesta_fw/t/functional/run_tests.py new file mode 100755 index 0000000000..884f5373ce --- /dev/null +++ b/tempesta_fw/t/functional/run_tests.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python2 + +import unittest, getopt, sys +from helpers import tf_cfg + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + +def usage(): + print( +""" +Functional tests for TempestaFW. + +Test Framework Configuration is stored in 'tests_config.ini', Use '-d' option +to get defaults. Normally 3 machines are used to run tests: one to run HTTP +clients, second for TempestaFw it self and third one for HTTP servers. Running +tests on localhost is possible but not recomended for development environment. + +Remote nodes controlled via SSH protocol. Make sure that you can be autorised by +key, not password. `ssh-copy-id` can be used for that. + +-h, --help - Print this help and exit. +-v, --verbose - Enable verbose output. +-d, --defaults - Save defaut configuration to config file + and exit. +-t, --duration - Duration of every single test. +-f, --failfast - Stop tests after first error. +""" + ) + +fail_fast = False + +try: + options, remainder = getopt.getopt(sys.argv[1:], 'hvdt:f', + ['help', 'verbose', 'defaults', + 'duration=', 'failfast']) + +except getopt.GetoptError as e: + print(e) + usage() + sys.exit(2) + +for opt, arg in options: + if opt in ('-f', '--failfast'): + fail_fast = True + if opt in ('-v', '--verbose'): + tf_cfg.cfg.inc_verbose() + if opt in ('-t', '--duration'): + if tf_cfg.cfg.set_duration(arg) == False: + print('Invalid option: ', opt, arg) + usage() + sys.exit(0) + elif opt in ('-d', '--save'): + tf_cfg.cfg.save_defaults() + sys.exit(0) + elif opt in ('-h', '--help'): + usage() + sys.exit(0) + +r, reason = tf_cfg.cfg.check() +if not r: + print(reason) + sys.exit(1) + +# Verbose level for unit tests must be > 1. +v_level = int(tf_cfg.cfg.get('General', 'Verbose')) + 1 + +# Install Ctrl-C handler for graceful stop. +unittest.installHandler() + +print(""" +---------------------------------------------------------------------- +Running functional tests... +---------------------------------------------------------------------- +""") + +#run tests +loader = unittest.TestLoader() +tests = loader.discover('.') +testRunner = unittest.runner.TextTestRunner(verbosity = v_level, + failfast = fail_fast, + descriptions = False) +testRunner.run(tests) diff --git a/tempesta_fw/t/functional/sched/__init__.py b/tempesta_fw/t/functional/sched/__init__.py new file mode 100644 index 0000000000..be0cda34e6 --- /dev/null +++ b/tempesta_fw/t/functional/sched/__init__.py @@ -0,0 +1 @@ +__all__ = ['test_rr', 'test_hash_func', 'test_hash_stress', 'test_http'] diff --git a/tempesta_fw/t/functional/sched/test_hash_func.py b/tempesta_fw/t/functional/sched/test_hash_func.py new file mode 100644 index 0000000000..cd90eae9e8 --- /dev/null +++ b/tempesta_fw/t/functional/sched/test_hash_func.py @@ -0,0 +1,77 @@ +""" +Hash scheduler pins resourses to specific servers and connections. Functional +test, check that the same server connection is used for the same resource. +""" + +from __future__ import print_function +import unittest +from helpers import deproxy, tf_cfg, tempesta +from testers import functional + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + +class HashSchedulerTest(functional.FunctionalTest): + """Hash scheduler functional test, check that the same server connection + is used for the same resource. + """ + + messages = 100 + + def configure_tempesta(self): + functional.FunctionalTest.configure_tempesta(self) + for sg in self.tempesta.config.server_groups: + sg.sched = 'hash' + + def create_tester(self, message_chain): + self.tester = HashTester(message_chain, self.client, self.servers) + + def chains(self): + chain = functional.base_message_chain() + return [chain for i in range (self.messages)] + + def test_hash_scheduler(self): + self.generic_test_routine('cache 0;\n', self.chains()) + + +class HashSchedulerFailoveredTest(HashSchedulerTest): + """Same as HashSchedulerTest, but we will force servers to close connections + time to time. + """ + + def create_servers(self): + port = tempesta.upstream_port_start_from() + keep_alive = self.messages // 10 + self.servers = [deproxy.Server(port=port, keep_alive=keep_alive)] + + +class HashTester(deproxy.Deproxy): + + def __init__(self, *args, **kwargs): + deproxy.Deproxy.__init__(self, *args, **kwargs) + self.used_connection = None + self.store_failovered = False + + def run(self): + # Run loop to setup all the connections + self.loop(0.1) + self.used_connection = None + self.store_failovered = True + deproxy.Deproxy.run(self) + self.store_failovered = False + + def register_srv_connection(self, connection): + # Since only one server respond all requests with only one connection, + # this new connection is failovered used one. Keep it. + if self.store_failovered: + self.used_connection = connection + deproxy.Deproxy.register_srv_connection(self, connection) + + def recieved_forwarded_request(self, request, connection): + if not self.used_connection: + self.used_connection = connection + else: + assert self.used_connection is connection + return deproxy.Deproxy.recieved_forwarded_request(self, request, + connection) diff --git a/tempesta_fw/t/functional/sched/test_hash_stress.py b/tempesta_fw/t/functional/sched/test_hash_stress.py new file mode 100644 index 0000000000..3e84641035 --- /dev/null +++ b/tempesta_fw/t/functional/sched/test_hash_stress.py @@ -0,0 +1,44 @@ +""" +Hash scheduler pins resourses to specific servers and connections. Stress +test. Can't track server connections here, but real HTTP servers and clients +are used. +""" + +import unittest +from helpers import tempesta +from testers import stress + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + + +class BindToServer(stress.StressTest): + """ Hash scheduler binds URIs to specific connections, so only one server + must pull all the load if we try to get the same resource over and over + again. + """ + + def create_servers(self): + self.create_servers_helper(tempesta.servers_in_group()) + + def configure_tempesta(self): + """Configure Tempesta to use hash scheduler instead of default one. + """ + stress.StressTest.configure_tempesta(self) + for sg in self.tempesta.config.server_groups: + sg.sched = 'hash' + + def assert_servers(self): + self.servers_get_stats() + # Only one server must pull all the load. + loaded = 0 + for s in self.servers: + if s.requests: + loaded += 1 + self.assertEqual(self.tempesta.stats.cl_msg_forwarded, + s.requests) + self.assertEqual(loaded, 1) + + def test_hash(self): + self.generic_test_routine('cache 0;\n') diff --git a/tempesta_fw/t/functional/sched/test_http.py b/tempesta_fw/t/functional/sched/test_http.py new file mode 100644 index 0000000000..1c90cb3257 --- /dev/null +++ b/tempesta_fw/t/functional/sched/test_http.py @@ -0,0 +1,224 @@ +""" +Test fo http scheduler: +""" + +from __future__ import print_function +import asyncore +from helpers import tempesta, deproxy, tf_cfg +from testers import functional + +class HttpRules(functional.FunctionalTest): + """All requests must be forwarded to the right server groups according to + sched_http_rules. + """ + + requests_n = 20 + + config = ( + 'cache 0;\n' + '\n' + 'sched_http_rules {\n' + ' match uri_p uri prefix "/static";\n' + ' match uri_s uri suffix ".php";\n' + ' match host_p host prefix "static.";\n' + ' match host_s host suffix "tempesta-tech.com";\n' + ' match host_e host eq "foo.example.com";\n' + ' match hdr_h_p hdr_host prefix "bar.";\n' + ' match hdr_h_e hdr_host eq "buzz.natsys-lab.com";\n' + ' match hdr_h_s hdr_host suffix "natsys-lab.com";\n' + '}\n' + '\n') + + def make_chains(self, uri, extra_header=(None, None)): + chain = functional.base_message_chain(uri=uri) + + header, value = extra_header + if not header is None: + for req in [chain.request, chain.fwd_request]: + req.headers.delete_all(header) + req.headers.add(header, value) + req.update() + + return [chain for i in range(self.requests_n)] + + def create_client(self): + # Client will be created for every server. + for server in self.servers: + server.client = deproxy.Client() + + def create_servers(self): + port=tempesta.upstream_port_start_from() + server_options = [ + (('uri_p'), ('/static/index.html'), None, None), + (('uri_s'), ('/script.php'), None, None), + (('host_p'), ('/'), ('host'), ('static.example.com')), + (('host_s'), ('/'), ('host'), ('s.tempesta-tech.com')), + (('host_e'), ('/'), ('host'), ('foo.example.com')), + (('hdr_h_p'), ('/'), ('host'), ('bar.example.com')), + (('hdr_h_s'), ('/'), ('host'), ('test.natsys-lab.com')), + (('hdr_h_e'), ('/'), ('host'), ('buzz.natsys-lab.com')), + (('default'), ('/'), None, None)] + + for group, uri, header, value in server_options: + # Dont need too lot connections here. + server = deproxy.Server(port=port, connections=1) + port += 1 + server.group = group + server.chains = self.make_chains(uri=uri, + extra_header=(header, value)) + self.servers.append(server) + + def configure_tempesta(self): + """ Add every server to it's own server group with default scheduler. + """ + # We run server on the Client host. + ip = tf_cfg.cfg.get('Client', 'ip') + for s in self.servers: + sg = tempesta.ServerGroup(s.group) + sg.add_server(ip, s.port, s.conns_n) + self.tempesta.config.add_sg(sg) + + def create_testers(self): + self.testers = [ + HttpSchedTester(server.chains, server.client, [server]) + for server in self.servers] + for tester in self.testers: + tester.response_cb = self.response_recieved + + def routine(self): + for i in range(self.requests_n): + self.responses_recieved = 0 + for tester in self.testers: + tester.configure(i) + # Run asyncore loop with default timeout + self.testers[0].loop() + for tester in self.testers: + tester.check_expectations() + + def init(self): + self.tempesta.config.set_defconfig(self.config) + + self.create_servers() + self.configure_tempesta() + + self.tempesta.start() + self.create_client() + + self.create_testers() + + def test_scheduler(self): + self.init() + self.routine() + + self.tempesta.get_stats() + self.assert_tempesta() + + def response_recieved(self): + self.responses_recieved += 1 + if self.responses_recieved == len(self.servers): + raise asyncore.ExitNow + + def setUp(self): + self.testers = [] + functional.FunctionalTest.setUp(self) + + def tearDown(self): + if self.tempesta: + self.tempesta.stop() + for tester in self.testers: + tester.close_all() + + +class HttpRulesBackupServers(HttpRules): + + config = ( + 'cache 0;\n' + '\n' + 'sched_http_rules {\n' + ' match default * * * backup=backup;\n' + '}\n' + '\n') + + def make_chains(self, empty=True): + chain = None + if empty: + chain = deproxy.MessageChain.empty() + else: + chain = functional.base_message_chain() + return [chain for i in range(self.requests_n)] + + def create_server_helper(self, group, port): + server = deproxy.Server(port=port, connections=1) + server.group = group + server.chains = self.make_chains() + return server + + def create_servers(self): + port=tempesta.upstream_port_start_from() + for group in ['default', 'backup']: + server = self.create_server_helper(group, port) + port += 1 + if group == 'default': + self.main_server = server + else: + self.backup_server = server + self.servers.append(server) + + def test_scheduler(self): + self.init() + # Main server is online, backup server must not recieve traffic. + self.main_server.tester.message_chains = ( + self.make_chains(empty=False)) + self.routine() + + # Shutdown main server, responses must be frowarded to backup. + self.main_server.tester.message_chains = ( + self.make_chains(empty=True)) + self.main_server.tester.close_all() + self.backup_server.tester.message_chains = ( + self.make_chains(empty=False)) + self.routine() + + # Return main server back operational. + self.testers.remove(self.main_server.tester) + self.main_server = self.create_server_helper( + group=self.main_server.group, port=self.main_server.port) + tester = HttpSchedTester(self.make_chains(empty=False), + deproxy.Client(), [self.main_server]) + tester.response_cb = self.response_recieved + self.testers.append(tester) + self.backup_server.tester.message_chains = ( + self.make_chains(empty=True)) + + self.routine() + + # Check tempesta for no errors + self.tempesta.get_stats() + self.assert_tempesta() + + def response_recieved(self): + self.responses_recieved += 1 + if self.responses_recieved == 1: + raise asyncore.ExitNow + + +class HttpSchedTester(deproxy.Deproxy): + + def __init__(self, *args, **kwargs): + deproxy.Deproxy.__init__(self, *args, **kwargs) + + def configure(self, chain_n): + if chain_n in range(len(self.message_chains)): + self.current_chain = self.message_chains[chain_n] + else: + self.current_chain = deproxy.MessageChain.empty() + + self.recieved_chain = deproxy.MessageChain.empty() + self.client.clear() + self.client.set_request(self.current_chain.request) + + def recieved_response(self, response): + # A lot of clients running, dont raise asyncore.ExitNow directly + # instead call the + self.recieved_chain.response = response + self.response_cb() diff --git a/tempesta_fw/t/functional/sched/test_rr.py b/tempesta_fw/t/functional/sched/test_rr.py new file mode 100644 index 0000000000..f81a9f1ced --- /dev/null +++ b/tempesta_fw/t/functional/sched/test_rr.py @@ -0,0 +1,55 @@ +""" +Round-robin scheduler is fast and fair scheduler. First it choses server in +round-robin manner, then does the same to choose connection to server. Load of +all servers is quite identical even for servers with different connections +count. +""" + +import unittest +import math +import random +from helpers import tempesta +from testers import stress + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + + +class FairLoadEqualConns(stress.StressTest): + """ Round-Robin scheduler loads all the upstream servers in the fair + way. In this test servers have the same connections count. + """ + + # Precision of fair loading. + precision = 0.02 + + def create_servers(self): + self.create_servers_helper(tempesta.servers_in_group()) + + def assert_servers(self): + self.servers_get_stats() + cl_reqs = self.tempesta.stats.cl_msg_forwarded + s_reqs_expected = cl_reqs / len(self.servers) + s_reqs = 0 + for s in self.servers: + self.assertTrue(math.fabs(s.requests - s_reqs_expected) < + (self.precision * s_reqs_expected)) + s_reqs += s.requests + self.assertEqual(s_reqs, self.tempesta.stats.cl_msg_forwarded) + + def test_rr(self): + self.generic_test_routine('cache 0;\n') + + +class FairLoadRandConns(FairLoadEqualConns): + """ Same as FairLoadEqualConns, but in this test servers have random + connections count. Roun-robin scheduler still distributes load uniformely + arcross all the servers. + """ + + def create_servers(self): + """ Save number of connections to each upstream server """ + FairLoadEqualConns.create_servers(self) + for s in self.servers: + s.conns_n = random.randrange(1, tempesta.server_conns_max()) diff --git a/tempesta_fw/t/functional/selftests/__init__.py b/tempesta_fw/t/functional/selftests/__init__.py new file mode 100644 index 0000000000..815586ed23 --- /dev/null +++ b/tempesta_fw/t/functional/selftests/__init__.py @@ -0,0 +1,2 @@ +__all__ = ['test_requests', 'test_responses', 'test_headercollection', + 'test_deproxy'] diff --git a/tempesta_fw/t/functional/selftests/test_deproxy.py b/tempesta_fw/t/functional/selftests/test_deproxy.py new file mode 100644 index 0000000000..6e353c5d64 --- /dev/null +++ b/tempesta_fw/t/functional/selftests/test_deproxy.py @@ -0,0 +1,90 @@ +from __future__ import print_function +import unittest +from helpers import deproxy, tf_cfg, tempesta +from testers import functional + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + +def sample_rule(): + return functional.base_message_chain() + +def sample_rule_chunked(): + return functional.base_message_chain_chunked() + +def defconfig(): + return 'cache 0;\n' + + +class DeproxyDummyTest(functional.FunctionalTest): + """Test Deproxy, don't even start or setup TempestaFw in this test.""" + + def setUp(self): + self.client = None + self.servers = [] + tf_cfg.dbg(3) # Step to the next line after name of test case. + tf_cfg.dbg(3, '\tInit test case...') + + def tearDown(self): + if self.client: + self.client.close() + if self.tester: + self.tester.close_all() + + def create_clients(self): + port = tempesta.upstream_port_start_from() + self.client = deproxy.Client(port=port, host='Client') + + def create_servers(self): + port = tempesta.upstream_port_start_from() + self.servers = [deproxy.Server(port=port, connections=1)] + + def routine(self, message_chains): + self.create_servers() + self.create_clients() + self.create_tester(message_chains) + self.tester.run() + + def test_deproxy_one_chain(self): + chain = sample_rule() + # In this test we do not have proxy + chain.response = chain.server_response + chain.fwd_request = chain.request + + message_chains = [chain] + self.routine(message_chains) + + +class DeproxyTest(functional.FunctionalTest): + + def test_deproxy_one_chain(self): + message_chains = [sample_rule()] + self.generic_test_routine(defconfig(), message_chains) + + +class DeproxyChunkedTest(functional.FunctionalTest): + + def test_deproxy_one_chain(self): + message_chains = [sample_rule_chunked()] + self.generic_test_routine(defconfig(), message_chains) + + +class DeproxyTestFailOver(DeproxyTest): + + def create_servers(self): + port = tempesta.upstream_port_start_from() + self.servers = [deproxy.Server(port=port, keep_alive=1)] + + def create_tester(self, message_chain): + + class DeproxyFailOver(deproxy.Deproxy): + def check_expectations(self): + # We closed server connection after response. Tempesta must + # failover the connection. Run loop with small timeout + # once again to pocess events. + self.loop(0.1) + assert self.is_srvs_ready(), 'Failovering failed!' + deproxy.Deproxy.check_expectations(self) + + self.tester = DeproxyFailOver(message_chain, self.client, self.servers) diff --git a/tempesta_fw/t/functional/selftests/test_deproxy_message.py b/tempesta_fw/t/functional/selftests/test_deproxy_message.py new file mode 100644 index 0000000000..386dbb2765 --- /dev/null +++ b/tempesta_fw/t/functional/selftests/test_deproxy_message.py @@ -0,0 +1,213 @@ +from __future__ import print_function +import unittest +import asyncore +from helpers import deproxy + +class TestDeproxyMessage(unittest.TestCase): + + def test_incomplite(self): + message_1 = "HTTP/1.1 20" + message_2 = ("HTTP/1.1 200 OK\r\n" + "Date: Mon, 23 May 2005 22:38:34 GMT\r\n" + "Content-Type: text/html; charset=UTF-8\r\n" + "Content-Enco") + message_3 = ("HTTP/1.1 200 OK\r\n" + "Date: Mon, 23 May 2005 22:38:34 GMT\r\n" + "Content-Type: text/html; charset=UTF-8\r\n") + message_4 = ("HTTP/1.1 200 OK\r\n" + "Date: Mon, 23 May 2005 22:38:34 GMT\r\n" + "Content-Type: text/html; charset=UTF-8\r\n" + "Content-Encoding: UTF-8\r\n" + "Transfer-Encoding: compress, gzip, chunked\r\n" + "Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT\r\n" + "Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux)\r\n" + """ETag: "3f80f-1b6-3e1cb03b"\r\n""" + "Accept-Ranges: bytes\r\n" + "Connection: close\r\n" + "\r\n") + message_5 = ("HTTP/1.1 200 OK\r\n" + "Date: Mon, 23 May 2005 22:38:34 GMT\r\n" + "Content-Type: text/html; charset=UTF-8\r\n" + "Content-Encoding: UTF-8\r\n" + "Transfer-Encoding: compress, gzip, chunked\r\n" + "Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT\r\n" + "Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux)\r\n" + """ETag: "3f80f-1b6-3e1cb03b"\r\n""" + "Accept-Ranges: bytes\r\n" + "Connection: close\r\n" + "\r\n" + "6\r\n" + "\r\n" + "0\r\n" + "Expires: Wed, 21 Oct 2015 07:28:00 GMT") + message_6 = ("HTTP/1.1 200 OK\r\n" + "Date: Mon, 23 May 2005 22:38:34 GMT\r\n" + "Content-Type: text/html; charset=UTF-8\r\n" + "Content-Encoding: UTF-8\r\n" + "Content-Length: 1000\r\n" + "Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT\r\n" + "Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux)\r\n" + """ETag: "3f80f-1b6-3e1cb03b"\r\n""" + "Accept-Ranges: bytes\r\n" + "Connection: close\r\n" + "\r\n" + "\r\n") + incomplite = [(message_1, 'header'), + (message_2, 'header'), + (message_3, 'header: no CRLF'), + (message_4, 'body: no last-chunk'), + (message_5, 'trailer: no CRLF'), + (message_6, 'body: too short')] + for message, reason in incomplite: + msg = ('Message parsed, but it has incomplite %s. Message:\n%s' + % (reason, message)) + parsed = True + try: + deproxy.Response(message) + except deproxy.ParseError: + parsed = False + self.assertFalse(parsed, msg) + + def test_valid(self): + message_1 = ("HTTP/1.1 200 OK\r\n" + "Date: Mon, 23 May 2005 22:38:34 GMT\r\n" + "Content-Type: text/html; charset=UTF-8\r\n" + "\r\n") + message_2 = ("HTTP/1.1 200 OK\r\n" + "Date: Mon, 23 May 2005 22:38:34 GMT\r\n" + "Content-Type: text/html; charset=UTF-8\r\n" + "Content-Encoding: UTF-8\r\n" + "Transfer-Encoding: compress, gzip, chunked\r\n" + "Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT\r\n" + "Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux)\r\n" + """ETag: "3f80f-1b6-3e1cb03b"\r\n""" + "Accept-Ranges: bytes\r\n" + "Connection: close\r\n" + "\r\n" + "0\r\n" + "\r\n") + message_3 = ("HTTP/1.1 200 OK\r\n" + "Date: Mon, 23 May 2005 22:38:34 GMT\r\n" + "Content-Type: text/html; charset=UTF-8\r\n" + "Content-Encoding: UTF-8\r\n" + "Transfer-Encoding: compress, gzip, chunked\r\n" + "Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT\r\n" + "Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux)\r\n" + """ETag: "3f80f-1b6-3e1cb03b"\r\n""" + "Accept-Ranges: bytes\r\n" + "Connection: close\r\n" + "\r\n" + "6\r\n" + "\r\n" + "0\r\n" + "\r\n") + message_4 = ("HTTP/1.1 200 OK\r\n" + "Date: Mon, 23 May 2005 22:38:34 GMT\r\n" + "Content-Type: text/html; charset=UTF-8\r\n" + "Content-Encoding: UTF-8\r\n" + "Transfer-Encoding: compress, gzip, chunked\r\n" + "Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT\r\n" + "Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux)\r\n" + """ETag: "3f80f-1b6-3e1cb03b"\r\n""" + "Accept-Ranges: bytes\r\n" + "Connection: close\r\n" + "\r\n" + "6\r\n" + "\r\n" + "0\r\n" + "\r\n" + "Expires: Wed, 21 Oct 2015 07:28:00 GMT\r\n" + "\r\n") + message_5 = ("HTTP/1.1 200 OK\r\n" + "Date: Mon, 23 May 2005 22:38:34 GMT\r\n" + "Content-Type: text/html; charset=UTF-8\r\n" + "Content-Encoding: UTF-8\r\n" + "Content-Length: 6\r\n" + "Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT\r\n" + "Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux)\r\n" + """ETag: "3f80f-1b6-3e1cb03b"\r\n""" + "Accept-Ranges: bytes\r\n" + "Connection: close\r\n" + "\r\n" + "" + "\r\n") + message_6 = ("HTTP/1.1 200 OK\r\n" + "Date: Mon, 23 May 2005 22:38:34 GMT\r\n" + "Content-Type: text/html; charset=UTF-8\r\n" + "Content-Encoding: UTF-8\r\n" + "Content-Length: 0\r\n" + "Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT\r\n" + "Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux)\r\n" + """ETag: "3f80f-1b6-3e1cb03b"\r\n""" + "Accept-Ranges: bytes\r\n" + "Connection: close\r\n" + "\r\n") + valid_messages = [message_1, message_2, message_3, message_4, message_5, + message_6] + for message in valid_messages: + try: + deproxy.Response(message) + except deproxy.ParseError: + print('Error happen when processed message\n%s' % message) + raise + + def test_request_plain(self): + request = deproxy.Request( + "GET / HTTP/1.1\r\n" + "Host: 10.0.10.2\r\n" + "User-Agent: curl/7.53.1\r\n" + "Accept: */*\r\n" + "\r\n") + headers = ['Host: 10.0.10.2', 'User-Agent: curl/7.53.1', 'Accept: */*'] + created = deproxy.Request.create('GET', headers) + self.assertEqual(request, created) + + def test_request_body(self): + request = deproxy.Request( + "GET / HTTP/1.1\r\n" + "Host: 10.0.10.2\r\n" + "User-Agent: curl/7.53.1\r\n" + "Accept: */*\r\n" + "Content-Length: 6\r\n" + "\r\n" + "" + "\r\n") + headers = ['Host: 10.0.10.2', 'User-Agent: curl/7.53.1', 'Accept: */*', + 'Content-Length: 6'] + body = "\r\n" + created = deproxy.Request.create('GET', headers, body=body) + self.assertEqual(request, created) + + def test_response_plain(self): + response = deproxy.Response( + "HTTP/1.1 200 OK\r\n" + "Server: SimpleHTTP/0.6 Python/3.6.0\r\n" + "Content-type: text/html\r\n" + "Content-Length: 138\r\n" + "Last-Modified: Mon, 12 Dec 2016 13:59:39 GMT\r\n" + "\r\n" + "\r\n" + "\r\n" + " An Example Page\r\n" + "\r\n" + "\r\n" + " Hello World, this is a very simple HTML document.\r\n" + "\r\n" + "\r\n" + ) + headers = [ + 'Server: SimpleHTTP/0.6 Python/3.6.0', + 'Content-type: text/html', + 'Content-Length: 138', + 'Last-Modified: Mon, 12 Dec 2016 13:59:39 GMT'] + body = ( + "\r\n" + "\r\n" + " An Example Page\r\n" + "\r\n" + "\r\n" + " Hello World, this is a very simple HTML document.\r\n" + "\r\n" + "\r\n") + created = deproxy.Response.create(200, headers, body=body) + self.assertEqual(response, created) diff --git a/tempesta_fw/t/functional/selftests/test_headercollection.py b/tempesta_fw/t/functional/selftests/test_headercollection.py new file mode 100644 index 0000000000..6d7734fe29 --- /dev/null +++ b/tempesta_fw/t/functional/selftests/test_headercollection.py @@ -0,0 +1,136 @@ +from __future__ import print_function +import unittest +from StringIO import StringIO +from helpers import deproxy + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + +class TestHeaderCollection(unittest.TestCase): + def setUp(self): + self.headers = deproxy.HeaderCollection() + + def test_length(self): + self.assertEqual(len(self.headers), 0) + self.headers.add('Name', 'Value') + self.assertEqual(len(self.headers), 1) + + def test_add(self): + self.headers.add('Name', 'Value') + self.assertIn('name', self.headers) + self.assertIn('Name', self.headers) + + def test_find_all(self): + self.headers.add('A', 'qwerty') + self.headers.add('B', 'asdf') + self.headers.add('C', 'zxcv') + self.headers.add('A', 'uiop') + self.headers.add('A', 'jkl;') + + result = [value for value in self.headers.find_all('A')] + self.assertEqual(result, ['qwerty', 'uiop', 'jkl;']) + + result = [value for value in self.headers.find_all('a')] + self.assertEqual(result, ['qwerty', 'uiop', 'jkl;']) + + def test_bracket_case(self): + self.headers.add('Name', 'Value') + + try: + self.assertEqual(self.headers['name'], 'Value') + self.assertEqual(self.headers['Name'], 'Value') + except: + self.fail() + + def test_get(self): + self.headers.add('Name', 'Value') + + self.assertEqual(self.headers.get('Name'), 'Value') + self.assertEqual(self.headers.get('name'), 'Value') + self.assertIsNone(self.headers.get('asdf')) + self.assertEqual(self.headers.get('name', default='zxcv'), 'Value') + self.assertEqual(self.headers.get('asdf', default='zxcv'), 'zxcv') + + def test_keys(self): + self.headers.add('A', 'qwerty') + self.headers.add('B', 'asdf') + self.headers.add('C', 'zxcv') + self.headers.add('A', 'uiop') + self.headers.add('A', 'jkl;') + + self.assertEqual(set(self.headers.keys()), set(['a', 'b', 'c'])) + + def test_from_stream(self): + test_headers = [('User-Agent', 'Wget/1.13.4 (linux-gnu)'), + ('Accept', '*/*'), + ('Host', ' localhost '), + ('Connection', ' Keep-Alive'), + ('X-Custom-Hdr', 'custom header values'), + ('x-custom-hdr', 'custom header values 2'), + ('X-Forwarded-For', '127.0.0.1, example.com'), + ('Content-Type', 'text/html; charset=iso-8859-1'), + ('Cache-Control', 'max-age=1, no-store, min-fresh=30'), + ('Pragma', 'no-cache, fooo'), + ('Transfer-Encoding', 'compress, gzip, chunked'), + ('Cookie', 'session=42; theme=dark')] + text = '\r\n'.join(['%s: %s' % header for header in test_headers] + + ['\r\n']) + + stream = StringIO(text) + parsed_headers = deproxy.HeaderCollection.from_stream(stream) + self.assertEqual(len(parsed_headers), len(test_headers)) + + for header, value in test_headers: + if header.lower() == 'x-custom-hdr': + continue + self.assertEqual(parsed_headers[header], value.strip()) + self.assertEqual(parsed_headers[header.lower()], value.strip()) + + for header in ['X-Custom-Hdr', 'x-custom-hdr']: + self.assertEqual( + set(parsed_headers.find_all(header)), + set(['custom header values', 'custom header values 2'])) + + expect_headers = [ (header.strip(), value.strip()) + for (header, value) in test_headers] + self.assertEqual(expect_headers, parsed_headers.items()) + + def test_is_equal(self): + self.headers.add('A', 'qwerty') + self.headers.add('B', 'asdf') + self.headers.add('C', 'zxcv') + self.headers.add('A', 'uiop') + self.headers.add('A', 'jkl;') + + reorderd = deproxy.HeaderCollection() + reorderd.add('C', 'zxcv') + reorderd.add('A', 'uiop') + reorderd.add('A', 'jkl;') + reorderd.add('A', 'qwerty') + reorderd.add('B', 'asdf') + self.assertTrue(self.headers == reorderd) + self.assertFalse(self.headers != reorderd) + + other = deproxy.HeaderCollection() + other.add('C', 'zxcv') + other.add('A', 'uiop') + other.add('A', 'jkl;') + self.assertTrue(self.headers != other) + self.assertFalse(self.headers == other) + + same_keys = deproxy.HeaderCollection() + same_keys.add('C', 'zxcv') + same_keys.add('B', 'uiop') + same_keys.add('A', 'jkl;') + self.assertTrue(self.headers != same_keys) + self.assertFalse(self.headers == same_keys) + + lowed = deproxy.HeaderCollection() + lowed.add('c', 'zxcv') + lowed.add('a', 'uiop') + lowed.add('A', 'jkl;') + lowed.add('a', 'qwerty') + lowed.add('b', 'asdf') + self.assertTrue(self.headers == lowed) + self.assertFalse(self.headers != lowed) diff --git a/tempesta_fw/t/functional/selftests/test_requests.py b/tempesta_fw/t/functional/selftests/test_requests.py new file mode 100644 index 0000000000..2f1cf36f68 --- /dev/null +++ b/tempesta_fw/t/functional/selftests/test_requests.py @@ -0,0 +1,98 @@ +from __future__ import print_function +import unittest +from helpers import deproxy + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + +class ParseRequest(unittest.TestCase): + + def setUp(self): + self.plain = deproxy.Request(PLAIN) + self.reordered = deproxy.Request(REORDERED) + self.duplicated = deproxy.Request(DUPLICATED) + + def test_equal(self): + # Reordering of headers is allowed. + self.assertTrue(self.plain == self.reordered) + self.assertFalse(self.plain != self.reordered) + + self.assertFalse(self.plain == self.duplicated) + self.assertTrue(self.plain != self.duplicated) + + self.assertFalse(self.reordered == self.duplicated) + self.assertTrue(self.reordered != self.duplicated) + + def test_parse(self): + self.assertEqual(self.plain.method, 'GET') + self.assertEqual(self.plain.uri, '/foo') + self.assertEqual(self.plain.version, 'HTTP/1.1') + + headers = [('User-Agent', 'Wget/1.13.4 (linux-gnu)'), + ('Accept', '*/*'), + ('Host', 'localhost'), + ('Connection', 'Keep-Alive'), + ('X-Custom-Hdr', 'custom header values'), + ('X-Forwarded-For', '127.0.0.1, example.com'), + ('Content-Type', 'text/html; charset=iso-8859-1'), + ('Cache-Control', 'max-age=1, no-store, min-fresh=30'), + ('Pragma', 'no-cache, fooo'), + ('Cookie', 'session=42; theme=dark'), + ('Authorization', 'Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==')] + for header, value in headers: + self.assertEqual(self.plain.headers[header], value.strip()) + + self.assertEqual(self.plain.body, '') + + +PLAIN = """GET /foo HTTP/1.1 +User-Agent: Wget/1.13.4 (linux-gnu) +Accept: */* +Host: localhost +Connection: Keep-Alive +X-Custom-Hdr: custom header values +X-Forwarded-For: 127.0.0.1, example.com +Content-Type: text/html; charset=iso-8859-1 +Cache-Control: max-age=1, no-store, min-fresh=30 +Pragma: no-cache, fooo +Cookie: session=42; theme=dark +Authorization: Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ== + +""" + +# Reordered: +REORDERED = """GET /foo HTTP/1.1 +User-Agent: Wget/1.13.4 (linux-gnu) +Accept: */* +Host: localhost +Cache-Control: max-age=1, no-store, min-fresh=30 +Connection: Keep-Alive +X-Custom-Hdr: custom header values +X-Forwarded-For: 127.0.0.1, example.com +Content-Type: text/html; charset=iso-8859-1 +Pragma: no-cache, fooo +Cookie: session=42; theme=dark +Authorization: Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ== + +""" + +# With duplicated header: +DUPLICATED = """GET /foo HTTP/1.1 +User-Agent: Wget/1.13.4 (linux-gnu) +Accept: */* +Host: localhost +Connection: Keep-Alive +X-Custom-Hdr: custom header values +X-Forwarded-For: 127.0.0.1, example.com +Content-Type: text/html; charset=iso-8859-1 +Cache-Control: max-age=1, no-store, min-fresh=30 +Pragma: no-cache, fooo +Cookie: session=42; theme=dark +Authorization: Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ== +X-Custom-Hdr: other custom header values + +""" + +if __name__ == '__main__': + unittest.main() diff --git a/tempesta_fw/t/functional/selftests/test_responses.py b/tempesta_fw/t/functional/selftests/test_responses.py new file mode 100644 index 0000000000..1417765cba --- /dev/null +++ b/tempesta_fw/t/functional/selftests/test_responses.py @@ -0,0 +1,363 @@ +from __future__ import print_function +import unittest +from helpers import deproxy + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + +class ParseResponse(unittest.TestCase): + + def setUp(self): + self.plain = deproxy.Response(PLAIN) + self.reordered = deproxy.Response(REORDERED) + self.o_body = deproxy.Response(OTHER_BODY) + self.duplicated = deproxy.Response(DUPLICATED) + self.o_status = deproxy.Response(OTHER_STATUS) + + self.trailer = deproxy.Response(TRAILER) + self.o_trailer = deproxy.Response(OTHER_TRAILER) + + def test_equal(self): + # Reordering of headers is allowed. + self.assertTrue(self.plain == self.reordered) + self.assertFalse(self.plain != self.reordered) + + for resp in [self.o_body, self.duplicated, self.o_status]: + self.assertFalse(self.plain == resp) + self.assertTrue(self.plain != resp) + + for resp in [self.o_body, self.duplicated, self.o_status]: + self.assertFalse(self.reordered == resp) + self.assertTrue(self.reordered != resp) + + for resp in [self.duplicated, self.o_status]: + self.assertFalse(self.o_body == resp) + self.assertTrue(self.o_body != resp) + + self.assertFalse(self.duplicated == self.o_status) + self.assertTrue(self.duplicated != self.o_status) + + self.assertFalse(self.trailer == self.o_trailer) + self.assertTrue(self.trailer != self.o_trailer) + + def test_parse(self): + self.assertEqual(self.plain.status, '200') + self.assertEqual(self.plain.reason, 'OK') + self.assertEqual(self.plain.version, 'HTTP/1.1') + + headers = [('Date', 'Mon, 23 May 2005 22:38:34 GMT'), + ('Content-Type', 'text/html; charset=UTF-8'), + ('Content-Encoding', 'UTF-8'), + ('Content-Length', '130'), + ('Last-Modified', 'Wed, 08 Jan 2003 23:11:55 GMT'), + ('Server', 'Apache/1.3.3.7 (Unix) (Red-Hat/Linux)'), + ('ETag', '"3f80f-1b6-3e1cb03b"'), + ('Accept-Ranges', 'bytes'), + ('Connection', 'close')] + for header, value in headers: + self.assertEqual(self.plain.headers[header], value.strip()) + + self.assertEqual(self.plain.body, + ("\n" + "\n" + " An Example Page\n" + "\n" + "\n" + " Hello World, this is a very simple HTML document.\n" + "\n" + "\n")) + + +PLAIN = """HTTP/1.1 200 OK +Date: Mon, 23 May 2005 22:38:34 GMT +Content-Type: text/html; charset=UTF-8 +Content-Encoding: UTF-8 +Content-Length: 130 +Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT +Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux) +ETag: "3f80f-1b6-3e1cb03b" +Accept-Ranges: bytes +Connection: close + + + + An Example Page + + + Hello World, this is a very simple HTML document. + + + +""" + +# Reordered: +REORDERED = """HTTP/1.1 200 OK +Date: Mon, 23 May 2005 22:38:34 GMT +Content-Type: text/html; charset=UTF-8 +Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT +Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux) +ETag: "3f80f-1b6-3e1cb03b" +Accept-Ranges: bytes +Connection: close +Content-Encoding: UTF-8 +Content-Length: 130 + + + + An Example Page + + + Hello World, this is a very simple HTML document. + + + +""" + +# With other body: +OTHER_BODY = """HTTP/1.1 200 OK +Date: Mon, 23 May 2005 22:38:34 GMT +Content-Type: text/html; charset=UTF-8 +Content-Encoding: UTF-8 +Content-Length: 130 +Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT +Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux) +ETag: "3f80f-1b6-3e1cb03b" +Accept-Ranges: bytes +Connection: close + + + + An EXAMPLE Page + + + Hello World, this is a very simple HTML document. + + +""" + +# With duplicated header: +DUPLICATED = """HTTP/1.1 200 OK +Date: Mon, 23 May 2005 22:38:34 GMT +Content-Type: text/html; charset=UTF-8 +Content-Encoding: UTF-8 +Content-Length: 130 +Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT +Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux) +ETag: "3f80f-1b6-3e1cb03b" +Accept-Ranges: bytes +Connection: close +Connection: aloha + + + + An Example Page + + + Hello World, this is a very simple HTML document. + + + +""" + + +# With other status: +OTHER_STATUS = """HTTP/1.1 302 OK +Date: Mon, 23 May 2005 22:38:34 GMT +Content-Type: text/html; charset=UTF-8 +Content-Encoding: UTF-8 +Content-Length: 130 +Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT +Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux) +ETag: "3f80f-1b6-3e1cb03b" +Accept-Ranges: bytes +Connection: close + + + + An Example Page + + + Hello World, this is a very simple HTML document. + + + +""" + +TRAILER = """HTTP/1.1 200 OK +Date: Mon, 23 May 2005 22:38:34 GMT +Content-Type: text/html; charset=UTF-8 +Content-Encoding: UTF-8 +Transfer-Encoding: compress, gzip, chunked +Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT +Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux) +ETag: "3f80f-1b6-3e1cb03b" +Accept-Ranges: bytes +Connection: close + +4 +1234 +0 + +Expires: Wed, 21 Oct 2015 07:28:00 GMT +""" + +OTHER_TRAILER = """HTTP/1.1 200 OK +Date: Mon, 23 May 2005 22:38:34 GMT +Content-Type: text/html; charset=UTF-8 +Content-Encoding: UTF-8 +Transfer-Encoding: compress, gzip, chunked +Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT +Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux) +ETag: "3f80f-1b6-3e1cb03b" +Accept-Ranges: bytes +Connection: close + +4 +1234 +0 + +Expires: Wed, 21 Dec 2015 07:28:00 GMT +""" + +class ParseBody(unittest.TestCase): + + def default_body(self): + return ("\n" + "\n" + " An Example Page\n" + "\n" + "\n" + " Hello World, this is a very simple HTML document.\n" + "\n" + "\n") + + def chunked_body(self): + return ("4\n" + "1234\n" + "0\n" + "\n") + + def try_body(self, response_text, body_text, trailer_headers=None): + response = deproxy.Response(response_text) + self.assertEqual(response.body, body_text) + if not trailer_headers: + self.assertEqual(len(response.trailer), 0) + else: + for header, value in trailer_headers: + self.assertEqual(response.trailer[header], value.strip()) + + def test_chunked_empty(self): + self.try_body(PARSE_CHUNKED_EMPTY, '0\n\n') + + def test_chunked(self): + self.try_body(PARSE_CHUNKED, self.chunked_body()) + + def test_chunked_and_trailer(self): + self.try_body(PARSE_CHUNKED_AND_TRAILER, self.chunked_body(), + [('Expires', 'Wed, 21 Oct 2015 07:28:00 GMT')]) + + def test_contentlength(self): + self.try_body(PARSE_CONTENT_LENGTH, self.default_body()) + + def test_contentlength_too_short(self): + with self.assertRaises(deproxy.ParseError): + self.try_body(PARSE_CONTENT_LENGTH_TOO_SHORT, '') + + +PARSE_CHUNKED_EMPTY = """HTTP/1.1 200 OK +Date: Mon, 23 May 2005 22:38:34 GMT +Content-Type: text/html; charset=UTF-8 +Content-Encoding: UTF-8 +Transfer-Encoding: compress, gzip, chunked +Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT +Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux) +ETag: "3f80f-1b6-3e1cb03b" +Accept-Ranges: bytes +Connection: close + +0 + +""" + +PARSE_CHUNKED = """HTTP/1.1 200 OK +Date: Mon, 23 May 2005 22:38:34 GMT +Content-Type: text/html; charset=UTF-8 +Content-Encoding: UTF-8 +Transfer-Encoding: compress, gzip, chunked +Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT +Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux) +ETag: "3f80f-1b6-3e1cb03b" +Accept-Ranges: bytes +Connection: close + +4 +1234 +0 + +""" + +PARSE_CHUNKED_AND_TRAILER = """HTTP/1.1 200 OK +Date: Mon, 23 May 2005 22:38:34 GMT +Content-Type: text/html; charset=UTF-8 +Content-Encoding: UTF-8 +Transfer-Encoding: compress, gzip, chunked +Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT +Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux) +ETag: "3f80f-1b6-3e1cb03b" +Accept-Ranges: bytes +Connection: close + +4 +1234 +0 + +Expires: Wed, 21 Oct 2015 07:28:00 GMT +""" + +PARSE_CONTENT_LENGTH = """HTTP/1.1 200 OK +Date: Mon, 23 May 2005 22:38:34 GMT +Content-Type: text/html; charset=UTF-8 +Content-Encoding: UTF-8 +Content-Length: 130 +Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT +Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux) +ETag: "3f80f-1b6-3e1cb03b" +Accept-Ranges: bytes +Connection: close + + + + An Example Page + + + Hello World, this is a very simple HTML document. + + + +""" + +PARSE_CONTENT_LENGTH_TOO_SHORT = """HTTP/1.1 200 OK +Date: Mon, 23 May 2005 22:38:34 GMT +Content-Type: text/html; charset=UTF-8 +Content-Encoding: UTF-8 +Content-Length: 1000 +Last-Modified: Wed, 08 Jan 2003 23:11:55 GMT +Server: Apache/1.3.3.7 (Unix) (Red-Hat/Linux) +ETag: "3f80f-1b6-3e1cb03b" +Accept-Ranges: bytes +Connection: close + + + + An Example Page + + + Hello World, this is a very simple HTML document. + + + +""" + +if __name__ == '__main__': + unittest.main() diff --git a/tempesta_fw/t/functional/sessions/__init__.py b/tempesta_fw/t/functional/sessions/__init__.py new file mode 100644 index 0000000000..85c6340333 --- /dev/null +++ b/tempesta_fw/t/functional/sessions/__init__.py @@ -0,0 +1 @@ +__all__ = ['test_cookies'] diff --git a/tempesta_fw/t/functional/sessions/test_cookies.py b/tempesta_fw/t/functional/sessions/test_cookies.py new file mode 100644 index 0000000000..f120f1e763 --- /dev/null +++ b/tempesta_fw/t/functional/sessions/test_cookies.py @@ -0,0 +1,63 @@ +from __future__ import print_function +import sys +from helpers import tf_cfg, control +from testers import stress + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + +config_cookies = """ +cache 0; +sticky; +sticky_secret "f00)9eR59*_/22"; +sess_lifetime 100; +""" +config_cookies_enforced = """ +cache 0; +sticky enforce; +sticky_secret "f00)9eR59*_/22"; +sess_lifetime 100; +""" + +# UserAgent headers example, id must be filled before using +ua_example = 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:47.0; id:%d) Gecko/20100101 Firefox/47.0' + +class StressCookies(stress.StressTest): + """ Stress test for cookies. Clients do not support cookies. """ + + def create_clients_helper(self, client_class): + """ Cookies depends on IP adress and UserAgent header. We cannot affect + ip in the test, but we can start several traffic generators with unique + UserAgent headers each instead. + """ + self.clients = [] + conns = int(tf_cfg.cfg.get('General', 'concurrent_connections')) + for i in range(conns): + client = client_class() + client.set_user_agent(ua_example % i) + client.connections = 1 + self.clients.append(client) + + def create_clients(self): + self.create_clients_helper(control.Wrk) + + def test_cookies(self): + # FIXME: #383 workaround + for s in self.servers: + s.config.set_ka(sys.maxsize) + self.generic_test_routine(config_cookies) + + +class StressEnforcedCookies(StressCookies): + """ Stress test for cookies. Clients support cookies. Cookies are enforced. + """ + + def create_clients(self): + self.create_clients_helper(control.Siege) + + def test_cookies(self): + # FIXME: #383 workaround + for s in self.servers: + s.config.set_ka(sys.maxsize) + self.generic_test_routine(config_cookies_enforced) diff --git a/tempesta_fw/t/functional/testers/__init__.py b/tempesta_fw/t/functional/testers/__init__.py new file mode 100644 index 0000000000..0bad1c03fd --- /dev/null +++ b/tempesta_fw/t/functional/testers/__init__.py @@ -0,0 +1 @@ +__all__ = ['functional', 'testers'] diff --git a/tempesta_fw/t/functional/testers/functional.py b/tempesta_fw/t/functional/testers/functional.py new file mode 100644 index 0000000000..b031b9c24a --- /dev/null +++ b/tempesta_fw/t/functional/testers/functional.py @@ -0,0 +1,169 @@ +from __future__ import print_function +import unittest +import copy +from helpers import tf_cfg, control, tempesta, deproxy + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + +class FunctionalTest(unittest.TestCase): + + def create_client(self): + """ Override to set desired list of benchmarkers and their options. """ + self.client = deproxy.Client() + + def create_tempesta(self): + """ Normally no override is needed. + Create controller for TempestaFW and add all servers to default group. + """ + self.tempesta = control.Tempesta() + + def configure_tempesta(self): + """ Add all servers to default server group with default scheduler. """ + sg = tempesta.ServerGroup('default') + # We run server on the Client host. + ip = tf_cfg.cfg.get('Client', 'ip') + for s in self.servers: + sg.add_server(ip, s.port, s.conns_n) + self.tempesta.config.add_sg(sg) + + def create_servers(self): + """ Overrirde to create needed amount of upstream servers. """ + port = tempesta.upstream_port_start_from() + self.servers = [deproxy.Server(port=port)] + + def create_servers_helper(self, count, start_port=None, keep_alive=None): + """ Helper function to spawn `count` servers in default configuration. + """ + if start_port is None: + start_port=tempesta.upstream_port_start_from() + self.servers = [] + for i in range(count): + self.servers.append(deproxy.Server(port=(start_port + i), + keep_alive=keep_alive)) + + def setUp(self): + self.client = None + self.tempesta = None + self.servers = [] + self.tester = None + tf_cfg.dbg(3) # Step to the next line after name of test case. + tf_cfg.dbg(3, '\tInit test case...') + self.create_tempesta() + + def tearDown(self): + # Close client connection before stopping the TempestaFW. + if self.client: + self.client.close() + if self.tempesta: + self.tempesta.stop() + if self.tester: + self.tester.close_all() + + def assert_tempesta(self): + """ Assert that tempesta had no errors during test. """ + msg = 'Tempesta have errors in processing HTTP %s.' + self.assertEqual(self.tempesta.stats.cl_msg_parsing_errors, 0, + msg=(msg % 'requests')) + self.assertEqual(self.tempesta.stats.srv_msg_parsing_errors, 0, + msg=(msg % 'responses')) + self.assertEqual(self.tempesta.stats.cl_msg_other_errors, 0, + msg=(msg % 'requests')) + self.assertEqual(self.tempesta.stats.srv_msg_other_errors, 0, + msg=(msg % 'responses')) + + def create_tester(self, message_chain): + self.tester = deproxy.Deproxy(message_chain, self.client, self.servers) + + def generic_test_routine(self, tempesta_defconfig, message_chains): + """ Make necessary updates to configs of servers, create tempesta config + and run the routine in you `test_*()` function. + """ + # Set defconfig for Tempesta. + self.tempesta.config.set_defconfig(tempesta_defconfig) + + self.create_servers() + self.configure_tempesta() + + self.tempesta.start() + self.create_client() + + self.create_tester(message_chains) + self.tester.run() + + self.tempesta.get_stats() + self.assert_tempesta() + + +def base_message_chain(uri='/'): + """Base message chain. Looks like simple Curl request to Tempesta and + response for it. + + Return new message chain. + """ + request_headers = [ 'Host: %s' % tf_cfg.cfg.get('Tempesta', 'ip'), + 'User-Agent: curl/7.53.1', + 'Connection: keep-alive', + 'Accept: */*'] + request = deproxy.Request.create('GET', request_headers, uri=uri) + + fwd_request_headers = ( + request_headers + + ['Via: 1.1 tempesta_fw (Tempesta FW %s)' % tempesta.version(), + 'X-Forwarded-For: %s' % tf_cfg.cfg.get('Client', 'ip')]) + fwd_request = deproxy.Request.create('GET', fwd_request_headers, uri=uri) + + response_headers = ['Content-type: text/html', + 'Connection: keep-alive', + 'Content-Length: 138', + 'Last-Modified: Mon, 12 Dec 2016 13:59:39 GMT'] + body = ("\r\n" + "\r\n" + " An Example Page\r\n" + "\r\n" + "\r\n" + " Hello World, this is a very simple HTML document.\r\n" + "\r\n" + "\r\n") + + server_headers = response_headers + ['Server: Deproxy Server'] + server_response = deproxy.Response.create( + 200, server_headers, date=True, body=body) + + tempesta_headers = ( + response_headers + + ['Server: Tempesta FW/%s' % tempesta.version(), + 'Via: 1.1 tempesta_fw (Tempesta FW %s)' % tempesta.version(), + 'Date: %s' % server_response.headers['Date']]) + tempesta_response = deproxy.Response.create( + 200, tempesta_headers, body=body) + + base_chain = deproxy.MessageChain(request=request, + expected_response=tempesta_response, + forwarded_request=fwd_request, + server_response=server_response) + + return copy.copy(base_chain) + + +def base_message_chain_chunked(uri='/'): + """Same as base_message_chain, but returns a copy of message chain with + chunked body. + """ + rule = base_message_chain() + body = ("4\r\n" + "1234\r\n" + "0\r\n" + "\r\n") + + for response in [rule.response, rule.server_response]: + response.headers.delete_all('Content-Length') + response.headers.add('Transfer-Encoding', 'chunked') + response.body = body + response.update() + + return rule + +if __name__ == '__main__': + unittest.main() diff --git a/tempesta_fw/t/functional/testers/stress.py b/tempesta_fw/t/functional/testers/stress.py new file mode 100644 index 0000000000..feddf3707c --- /dev/null +++ b/tempesta_fw/t/functional/testers/stress.py @@ -0,0 +1,144 @@ +from __future__ import print_function +import unittest +from helpers import tf_cfg, control, tempesta + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + +class StressTest(unittest.TestCase): + """ Test Suite to use HTTP benchmarkers as a clients. Can be used for + functional testing of schedulers and stress testing for other components. + """ + + def create_clients(self): + """ Override to set desired list of benchmarkers and their options. """ + self.clients = [control.Wrk()] + + def create_tempesta(self): + """ Normally no override is needed. + Create controller for TempestaFW and add all servers to default group. + """ + self.tempesta = control.Tempesta() + + def configure_tempesta(self): + """ Add all servers to default server group with default scheduler. """ + sg = tempesta.ServerGroup('default') + ip = tf_cfg.cfg.get('Server', 'ip') + for s in self.servers: + sg.add_server(ip, s.config.port, s.conns_n) + self.tempesta.config.add_sg(sg) + + def create_servers(self): + """ Overrirde to create needed amount of upstream servers. """ + port = tempesta.upstream_port_start_from() + self.servers = [control.Nginx(listen_port=port)] + + def create_servers_helper(self, count, start_port=None): + """ Helper function to spawn `count` servers in default configuration. + + See comment in Nginx.get_stats(). + """ + if start_port is None: + start_port=tempesta.upstream_port_start_from() + self.servers = [] + for i in range(count): + self.servers.append(control.Nginx(listen_port=(start_port + i))) + + def setUp(self): + # Init members used in tearDown function. + self.tempesta = None + self.servers = [] + tf_cfg.dbg(3) # Step to the next line after name of test case. + tf_cfg.dbg(3, '\tInit test case...') + self.create_clients() + self.create_servers() + self.create_tempesta() + + def tearDown(self): + """ Carefully stop all servers. Error on stop will make next test fail, + so mark test as failed even if eveything other is fine. + """ + # Call functions only if variables not None: there might be an error + # before tempesta would be created. + if self.tempesta: + self.tempesta.stop() + if self.servers: + control.servers_stop(self.servers) + + def show_performance(self): + if tf_cfg.v_level() < 2: + return + if tf_cfg.v_level() == 2: + # Go to new line, don't mess up output. + print() + req_total = err_total = 0 + for c in self.clients: + req, err = c.results() + req_total += req + err_total += err + tf_cfg.dbg(3, '\tClient: errors: %d, requests: %d' % (err, req)) + tf_cfg.dbg( + 2, '\tClients in total: errors: %d, requests: %d' % + (err_total, req_total)) + + + def assert_clients(self): + """ Check benchmark result: no errors happen, no packet loss. """ + cl_req_cnt = 0 + for c in self.clients: + req, err = c.results() + cl_req_cnt += req + self.assertEqual(err, 0, msg='HTTP client detected errors') + # Clients counts only complited requests and closes connections before + # Tempesta can send responses. So Tempesta recieved requests count + # differ from request count shown by clients. Didn't find any way how to + # fix that. + # Just check that difference is less than concurrent connections count. + expected_diff = int(tf_cfg.cfg.get('General', 'concurrent_connections')) + self.assertTrue((self.tempesta.stats.cl_msg_received - cl_req_cnt) <= + expected_diff) + + def assert_tempesta(self): + """ Assert that tempesta had no errors during test. """ + msg = 'Tempesta have errors in processing HTTP %s.' + self.assertEqual(self.tempesta.stats.cl_msg_parsing_errors, 0, + msg=(msg % 'requests')) + self.assertEqual(self.tempesta.stats.srv_msg_parsing_errors, 0, + msg=(msg % 'responses')) + # See comment in `assert_clients()` + expected_err = int(tf_cfg.cfg.get('General', 'concurrent_connections')) + self.assertTrue(self.tempesta.stats.cl_msg_other_errors <= + expected_err, msg=(msg % 'requests')) + self.assertTrue(self.tempesta.stats.srv_msg_other_errors <= + expected_err, msg=(msg % 'responses')) + + def assert_servers(self): + # Nothing to do for nginx in default configuration. + pass + + def servers_get_stats(self): + control.servers_get_stats(self.servers) + + def generic_test_routine(self, tempesta_defconfig): + """ Make necessary updates to configs of servers, create tempesta config + and run the routine in you `test_*()` function. + """ + # Set defconfig for Tempesta. + self.tempesta.config.set_defconfig(tempesta_defconfig) + self.configure_tempesta() + control.servers_start(self.servers) + self.tempesta.start() + + control.clients_run_parallel(self.clients) + self.show_performance() + + # Tempesta statistics is valueble to client assertions. + self.tempesta.get_stats() + + self.assert_clients() + self.assert_tempesta() + self.assert_servers() + +if __name__ == '__main__': + unittest.main() diff --git a/tempesta_fw/t/functional/tests_config.ini.sample b/tempesta_fw/t/functional/tests_config.ini.sample new file mode 100644 index 0000000000..1dfed26ed3 --- /dev/null +++ b/tempesta_fw/t/functional/tests_config.ini.sample @@ -0,0 +1,126 @@ +[General] +# This section refer to testing framework itself. + +# Verbose level: +# 0 - quiet mode, result of each test is shown by symbols: +# `.` - passed, +# `F` - failed, +# `u` - unexpected success, +# `x` - expected failure. +# `s` - skipped; +# 1 - Show test names; +# 2 - Show tests names and performance counters; +# 3 - Full debug output. +# +# ex.: verbose = 2 (default 0) +# +verbose = 1 + +# Duration of every single test involving HTTP benchmarks utilities, in seconds. +# Use small values to obtain results quickly add large for more heavy stress +# tests. Default is 10 seconds. +# +# ex.: duration = 60 (default 10) +# +duration = 10 + +# Number of concurrent connections for HTTP benchmarks utilities. +# Larger values stress TempestaFW better, but require more resources. +# +# ex.: concurrent_connections = 10 (default 10) +# +concurrent_connections = 10 + + +[Client] +# Configuration for HTTP clients. + +# Binaries of utilities used for testing. Must be present in PATH or +# absolute paths must be given. + +# Apache Benchmark binary. +# +# ex.: ab = /usr/bin/ab (default ab) +# +ab = ab + +# wrk benchmark utility. +# +# ex.: wrk = /home/user/wrk/wrk (default wrk) +# +wrk = wrk + +# Siege benchmark utility. +# +# ex.: siege = /usr/bin/siege (default siege) +# +siege = /usr/bin/siege + + +[Tempesta] +# Configuration for the host running TempesaFW. Host may be remote or local +# one. Refer to Readmi.md for more information. + +# IPv4/IPv6 address of interface used for testing. This value will be used +# as a parameter in commands or configuration files. +# +# ex.: ip = 192.168.11.6 (default 127.0.0.1) +# +ip = 127.0.0.1 + +# Target host description. Host can be remote or local one. Remote hosts are +# controlled via SSH protocol. +# +# ex.: Use remote host to run TempestaFW: +# hostname = tempesta.test (default localhost) +# user = rtest (default root) +# port = 22022 (default 22) +# +# ex.: Use the local host to run HTTP clients (default): +# hostname = localhost +# +hostname = localhost + +# Absolute path to TempestaFW sources directory. +# +# ex.: workdir = /home/user/tempesta (default /root/tempesta) +# +workdir = /root/tempesta + + +[Server] +# Configuration for the host running HTTP servers. Host may be remote or local +# one. Refer to Readmi.md for more information. + +# IPv4/IPv6 address of interface used for testing. This value will be used +# as a parameter in commands or configuration files. +# +# ex.: ip = 192.168.11.7 (default 127.0.0.1) +# +ip = 127.0.0.1 + +# Target host description. Host can be remote or local one. Remote hosts are +# controlled via SSH protocol. +# +# ex.: Use remote host to run HTTP servers: +# hostname = remote.com (default localhost) +# user = rtest (default root) +# port = 22022 (default 22) +# +# ex.: Use the local host to run HTTP clients (default): +# hostname = localhost +# +hostname = localhost + +# NGINX binary. Must be present in PATH or absolute path must be given. +# +# ex.: nginx = /usr/bin/nginx (default nginx) +# +nginx = nginx + +# Absolute path to sample HTTP root location. Must be reachable by nginx. +# +# ex.: resources = /usr/share/nginx/http/ (default /var/www/html/) +# +resources = /var/www/html/ +