From fa15183531064ac2ebfd3f0686ca8b8d142aea18 Mon Sep 17 00:00:00 2001 From: Patrik Ragnarsson Date: Tue, 3 Oct 2023 20:07:06 +0200 Subject: [PATCH] Disconnect clients on broken upstream connection This change also removes the code added in https://github.com/cloudamqp/amqproxy/pull/104 The motivation behind that is * IO:_Error that happens in Upstream#connect is raised as Upstream::Error * IO::Error that happens in Upstream#read_loop is rescued in that method * IO::Error that happens in Client#read_loop is raised as Client::Error Adds an integration test with the php-amqp client run via Docker Compose and Toxiproxy. Close https://github.com/cloudamqp/amqproxy/issues/118 Close https://github.com/cloudamqp/amqproxy/issues/111 Close https://github.com/cloudamqp/amqproxy/issues/98 --- .github/workflows/ci.yml | 9 +++ src/amqproxy/server.cr | 4 -- src/amqproxy/upstream.cr | 4 +- test/integration-php.sh | 16 ++++++ test/integration-php/docker-compose.yml | 57 +++++++++++++++++++ test/integration-php/php-amqp/Dockerfile | 21 +++++++ test/integration-php/php-amqp/get-test.php | 51 +++++++++++++++++ test/integration-php/toxiproxy-cli/Dockerfile | 5 ++ .../toxiproxy-cli/entrypoint.sh | 15 +++++ test/integration-php/toxiproxy.json | 8 +++ 10 files changed, 185 insertions(+), 5 deletions(-) create mode 100755 test/integration-php.sh create mode 100644 test/integration-php/docker-compose.yml create mode 100644 test/integration-php/php-amqp/Dockerfile create mode 100644 test/integration-php/php-amqp/get-test.php create mode 100644 test/integration-php/toxiproxy-cli/Dockerfile create mode 100644 test/integration-php/toxiproxy-cli/entrypoint.sh create mode 100644 test/integration-php/toxiproxy.json diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4113f1e..c67fdff 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -45,3 +45,12 @@ jobs: steps: - uses: actions/checkout@v3 - run: crystal tool format --check + + integration: + runs-on: ubuntu-latest + timeout-minutes: 10 + steps: + - name: Checkout + uses: actions/checkout@v4 + - name: PHP client integration test using Docker Compose + run: ./test/integration-php.sh diff --git a/src/amqproxy/server.cr b/src/amqproxy/server.cr index 9d5085d..004dbdc 100644 --- a/src/amqproxy/server.cr +++ b/src/amqproxy/server.cr @@ -86,10 +86,6 @@ module AMQProxy close = AMQ::Protocol::Frame::Connection::Close.new(403_u16, "UPSTREAM_ERROR", 0_u16, 0_u16) close.to_io socket, IO::ByteFormat::NetworkEndian socket.flush - rescue ex : IO::Error - @log.error { "IO Error for user '#{user}' to vhost '#{vhost}': #{ex.message}" } - close = AMQ::Protocol::Frame::Connection::Close.new(403_u16, "IO_ERROR", 0_u16, 0_u16) - close.to_io socket, IO::ByteFormat::NetworkEndian end rescue ex : Client::Error @log.debug { "Client disconnected: #{remote_address}: #{ex.inspect}" } diff --git a/src/amqproxy/upstream.cr b/src/amqproxy/upstream.cr index 3460d26..6d09105 100644 --- a/src/amqproxy/upstream.cr +++ b/src/amqproxy/upstream.cr @@ -40,6 +40,7 @@ module AMQProxy # Frames from upstream (to client) def read_loop # ameba:disable Metrics/CyclomaticComplexity socket = @socket + client = @current_client loop do AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) do |frame| case frame @@ -55,7 +56,7 @@ module AMQProxy write frame next end - if client = @current_client + if client begin client.write(frame) rescue ex @@ -84,6 +85,7 @@ module AMQProxy @log.error "Error reading from upstream: #{ex.inspect_with_backtrace}" unless @socket.closed? ensure @socket.close unless @socket.closed? + client.close_socket if client end SAFE_BASIC_METHODS = {40, 10} # qos and publish diff --git a/test/integration-php.sh b/test/integration-php.sh new file mode 100755 index 0000000..80be847 --- /dev/null +++ b/test/integration-php.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +set -x +set -e + +# --force-recreate and --renew-anon-volumes needed to start fresh every time +# otherwise broker datadir volume may be re-used + +docker-compose \ + --file test/integration-php/docker-compose.yml \ + up \ + --remove-orphans \ + --force-recreate \ + --renew-anon-volumes \ + --build \ + --exit-code-from php-amqp diff --git a/test/integration-php/docker-compose.yml b/test/integration-php/docker-compose.yml new file mode 100644 index 0000000..229be12 --- /dev/null +++ b/test/integration-php/docker-compose.yml @@ -0,0 +1,57 @@ +version: "3.8" +configs: + toxiproxy_config: + file: ./toxiproxy.json +services: + amqproxy: + build: ../../ + expose: + - "5673" + entrypoint: ["amqproxy", "--debug", "--listen=0.0.0.0", "amqp://toxiproxy:7777"] + depends_on: + toxiproxy: + condition: service_started + + toxiproxy: + image: ghcr.io/shopify/toxiproxy:latest + expose: + - "8474" # Toxiproxy HTTP API + - "7777" # Expose AMQP broker on this port + command: ["-host=0.0.0.0", "-config=/toxiproxy_config"] + configs: + - toxiproxy_config + depends_on: + lavinmq: + condition: service_healthy + + lavinmq: + image: cloudamqp/lavinmq:latest + expose: + - "5672" + healthcheck: + test: ["CMD-SHELL", "lavinmqctl status"] + interval: 1s + timeout: 1s + retries: 10 + + php-amqp: + build: ./php-amqp + environment: + TEST_RABBITMQ_HOST: amqproxy + TEST_RABBITMQ_PORT: 5673 + # Makes this container exit with error if the test script hangs (timeout) + command: ["timeout", "15s", "php", "-d", "extension=amqp.so", "get-test.php"] + depends_on: + amqproxy: + condition: service_started + + toxiproxy-cli: + # Need to build our own image as the toxiproxy image has no shell + build: ./toxiproxy-cli + environment: + TOXIPROXY_URL: "http://toxiproxy:8474" + # The PHP client needs to start and connect before we add the toxiproxy toxic + # that will brake the amqproxy upstream connection, depends_on makes this happen + depends_on: + php-amqp: + condition: service_started diff --git a/test/integration-php/php-amqp/Dockerfile b/test/integration-php/php-amqp/Dockerfile new file mode 100644 index 0000000..a33eafb --- /dev/null +++ b/test/integration-php/php-amqp/Dockerfile @@ -0,0 +1,21 @@ +FROM php:7.4-cli +WORKDIR /app + +RUN apt-get update -q \ + && apt-get install -qq cmake libssl-dev git unzip \ + && rm -rf /var/lib/apt/lists/* + +# Install librabbitmq (https://github.com/alanxz/rabbitmq-c) +RUN \ + curl --location --silent --output /tmp/rabbitmq-c.tar.gz https://github.com/alanxz/rabbitmq-c/archive/v0.10.0.tar.gz \ + && mkdir -p /tmp/rabbitmq-c/build \ + && tar --gunzip --extract --strip-components 1 --directory /tmp/rabbitmq-c --file /tmp/rabbitmq-c.tar.gz \ + && cd /tmp/rabbitmq-c/build \ + && cmake -DBUILD_EXAMPLES=OFF -DBUILD_TESTS=OFF -DBUILD_TOOLS=OFF -DENABLE_SSL_SUPPORT=ON .. \ + && cmake --build . --target install \ + && ln -s /usr/local/lib/x86_64-linux-gnu/librabbitmq.so.4 /usr/local/lib/ + +# Install php-amqp (https://github.com/php-amqp/php-amqp) +RUN echo /usr/local | pecl install amqp + +COPY *.php ./ diff --git a/test/integration-php/php-amqp/get-test.php b/test/integration-php/php-amqp/get-test.php new file mode 100644 index 0000000..2120792 --- /dev/null +++ b/test/integration-php/php-amqp/get-test.php @@ -0,0 +1,51 @@ +setHost(HOST); +$connection->setPort(PORT); +$connection->setLogin(USER); +$connection->setPassword(PASS); +$connection->connect(); + +$channel = new AMQPChannel($connection); + +$exchange_name = "test-ex"; +$exchange = new AMQPExchange($channel); +$exchange->setType(AMQP_EX_TYPE_FANOUT); +$exchange->setName($exchange_name); +$exchange->declareExchange(); + +$queue = new AMQPQueue($channel); +$queue->setName("test-q"); +$queue->declareQueue(); +$queue->bind($exchange_name,$queue->getName()); + +$i = 1; +$exit_code = 1; // exception should be raised if the test setup works correctly + +while ($i <= ATTEMPTS) { + echo "Getting messages, attempt #", $i, PHP_EOL; + try { + $queue->get(AMQP_AUTOACK); + sleep(1); + } catch(Exception $e) { + $exit_code = 0; + echo "Caught exception: ", get_class($e), ": ", $e->getMessage(), PHP_EOL; + break; + } + $i++; +} + +if($exit_code == 1) { + echo "FAIL! Exception should be raised when the test setup works correctly", PHP_EOL; +} else { + echo "SUCCESS! Exception was raised.", PHP_EOL; +} +echo "Exiting with exit code: ", $exit_code, PHP_EOL; +exit($exit_code); diff --git a/test/integration-php/toxiproxy-cli/Dockerfile b/test/integration-php/toxiproxy-cli/Dockerfile new file mode 100644 index 0000000..b581cbb --- /dev/null +++ b/test/integration-php/toxiproxy-cli/Dockerfile @@ -0,0 +1,5 @@ +FROM alpine:latest +COPY --from=ghcr.io/shopify/toxiproxy:latest ./toxiproxy-cli /toxiproxy-cli +COPY ./entrypoint.sh /entrypoint.sh +RUN chmod +x /entrypoint.sh +ENTRYPOINT ["/entrypoint.sh"] diff --git a/test/integration-php/toxiproxy-cli/entrypoint.sh b/test/integration-php/toxiproxy-cli/entrypoint.sh new file mode 100644 index 0000000..cfffe03 --- /dev/null +++ b/test/integration-php/toxiproxy-cli/entrypoint.sh @@ -0,0 +1,15 @@ +#!/bin/sh + +set -x + +/toxiproxy-cli toxic \ + add \ + --type reset_peer \ + --attribute timeout=2000 \ + --toxicName reset_peer between-proxy-and-broker + +echo "Toxic added, sleeping..." + +# We sleep so the container keeps running as otherwise all containers would stop +# as we are using --exit-code-from (implies --abort-on-container-exit) +sleep 900 diff --git a/test/integration-php/toxiproxy.json b/test/integration-php/toxiproxy.json new file mode 100644 index 0000000..4d3bbbc --- /dev/null +++ b/test/integration-php/toxiproxy.json @@ -0,0 +1,8 @@ +[ + { + "name": "between-proxy-and-broker", + "listen": "0.0.0.0:7777", + "upstream": "lavinmq:5672", + "enabled": true + } +]