Skip to content

Commit

Permalink
Disconnect clients on broken upstream connection
Browse files Browse the repository at this point in the history
This change also removes the code added in #104

The motivation behind that is

* IO:_Error that happens in Upstream#connect is raised as Upstream::Error
* IO::Error that happens in Upstream#read_loop is rescued in that method
* IO::Error that happens in Client#read_loop is raised as Client::Error

Adds an integration test with the php-amqp client run via Docker Compose
and Toxiproxy.

Close #118
Close #111
Close #98
  • Loading branch information
dentarg committed Oct 3, 2023
1 parent 22914d0 commit fa15183
Show file tree
Hide file tree
Showing 10 changed files with 185 additions and 5 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,12 @@ jobs:
steps:
- uses: actions/checkout@v3
- run: crystal tool format --check

integration:
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- name: Checkout
uses: actions/checkout@v4
- name: PHP client integration test using Docker Compose
run: ./test/integration-php.sh
4 changes: 0 additions & 4 deletions src/amqproxy/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ module AMQProxy
close = AMQ::Protocol::Frame::Connection::Close.new(403_u16, "UPSTREAM_ERROR", 0_u16, 0_u16)
close.to_io socket, IO::ByteFormat::NetworkEndian
socket.flush
rescue ex : IO::Error
@log.error { "IO Error for user '#{user}' to vhost '#{vhost}': #{ex.message}" }
close = AMQ::Protocol::Frame::Connection::Close.new(403_u16, "IO_ERROR", 0_u16, 0_u16)
close.to_io socket, IO::ByteFormat::NetworkEndian
end
rescue ex : Client::Error
@log.debug { "Client disconnected: #{remote_address}: #{ex.inspect}" }
Expand Down
4 changes: 3 additions & 1 deletion src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ module AMQProxy
# Frames from upstream (to client)
def read_loop # ameba:disable Metrics/CyclomaticComplexity
socket = @socket
client = @current_client
loop do
AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) do |frame|
case frame
Expand All @@ -55,7 +56,7 @@ module AMQProxy
write frame
next
end
if client = @current_client
if client
begin
client.write(frame)
rescue ex
Expand Down Expand Up @@ -84,6 +85,7 @@ module AMQProxy
@log.error "Error reading from upstream: #{ex.inspect_with_backtrace}" unless @socket.closed?
ensure
@socket.close unless @socket.closed?
client.close_socket if client
end

SAFE_BASIC_METHODS = {40, 10} # qos and publish
Expand Down
16 changes: 16 additions & 0 deletions test/integration-php.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

set -x
set -e

# --force-recreate and --renew-anon-volumes needed to start fresh every time
# otherwise broker datadir volume may be re-used

docker-compose \
--file test/integration-php/docker-compose.yml \
up \
--remove-orphans \
--force-recreate \
--renew-anon-volumes \
--build \
--exit-code-from php-amqp
57 changes: 57 additions & 0 deletions test/integration-php/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
version: "3.8"
configs:
toxiproxy_config:
file: ./toxiproxy.json
services:
amqproxy:
build: ../../
expose:
- "5673"
entrypoint: ["amqproxy", "--debug", "--listen=0.0.0.0", "amqp://toxiproxy:7777"]
depends_on:
toxiproxy:
condition: service_started

toxiproxy:
image: ghcr.io/shopify/toxiproxy:latest
expose:
- "8474" # Toxiproxy HTTP API
- "7777" # Expose AMQP broker on this port
command: ["-host=0.0.0.0", "-config=/toxiproxy_config"]
configs:
- toxiproxy_config
depends_on:
lavinmq:
condition: service_healthy

lavinmq:
image: cloudamqp/lavinmq:latest
expose:
- "5672"
healthcheck:
test: ["CMD-SHELL", "lavinmqctl status"]
interval: 1s
timeout: 1s
retries: 10

php-amqp:
build: ./php-amqp
environment:
TEST_RABBITMQ_HOST: amqproxy
TEST_RABBITMQ_PORT: 5673
# Makes this container exit with error if the test script hangs (timeout)
command: ["timeout", "15s", "php", "-d", "extension=amqp.so", "get-test.php"]
depends_on:
amqproxy:
condition: service_started

toxiproxy-cli:
# Need to build our own image as the toxiproxy image has no shell
build: ./toxiproxy-cli
environment:
TOXIPROXY_URL: "http://toxiproxy:8474"
# The PHP client needs to start and connect before we add the toxiproxy toxic
# that will brake the amqproxy upstream connection, depends_on makes this happen
depends_on:
php-amqp:
condition: service_started
21 changes: 21 additions & 0 deletions test/integration-php/php-amqp/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
FROM php:7.4-cli
WORKDIR /app

RUN apt-get update -q \
&& apt-get install -qq cmake libssl-dev git unzip \
&& rm -rf /var/lib/apt/lists/*

# Install librabbitmq (https://github.com/alanxz/rabbitmq-c)
RUN \
curl --location --silent --output /tmp/rabbitmq-c.tar.gz https://github.com/alanxz/rabbitmq-c/archive/v0.10.0.tar.gz \
&& mkdir -p /tmp/rabbitmq-c/build \
&& tar --gunzip --extract --strip-components 1 --directory /tmp/rabbitmq-c --file /tmp/rabbitmq-c.tar.gz \
&& cd /tmp/rabbitmq-c/build \
&& cmake -DBUILD_EXAMPLES=OFF -DBUILD_TESTS=OFF -DBUILD_TOOLS=OFF -DENABLE_SSL_SUPPORT=ON .. \
&& cmake --build . --target install \
&& ln -s /usr/local/lib/x86_64-linux-gnu/librabbitmq.so.4 /usr/local/lib/

# Install php-amqp (https://github.com/php-amqp/php-amqp)
RUN echo /usr/local | pecl install amqp

COPY *.php ./
51 changes: 51 additions & 0 deletions test/integration-php/php-amqp/get-test.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php

define("HOST", getenv("TEST_RABBITMQ_HOST") ? getenv("TEST_RABBITMQ_HOST") : "localhost");
define("PORT", getenv("TEST_RABBITMQ_PORT") ? getenv("TEST_RABBITMQ_PORT") : "5672");
define("USER", getenv("TEST_RABBITMQ_USER") ? getenv("TEST_RABBITMQ_USER") : "guest");
define("PASS", getenv("TEST_RABBITMQ_PASS") ? getenv("TEST_RABBITMQ_PASS") : "guest");
define("ATTEMPTS", getenv("TEST_ATTEMPTS") ? getenv("TEST_ATTEMPTS") : 10);

$connection = new AMQPConnection();
$connection->setHost(HOST);
$connection->setPort(PORT);
$connection->setLogin(USER);
$connection->setPassword(PASS);
$connection->connect();

$channel = new AMQPChannel($connection);

$exchange_name = "test-ex";
$exchange = new AMQPExchange($channel);
$exchange->setType(AMQP_EX_TYPE_FANOUT);
$exchange->setName($exchange_name);
$exchange->declareExchange();

$queue = new AMQPQueue($channel);
$queue->setName("test-q");
$queue->declareQueue();
$queue->bind($exchange_name,$queue->getName());

$i = 1;
$exit_code = 1; // exception should be raised if the test setup works correctly

while ($i <= ATTEMPTS) {
echo "Getting messages, attempt #", $i, PHP_EOL;
try {
$queue->get(AMQP_AUTOACK);
sleep(1);
} catch(Exception $e) {
$exit_code = 0;
echo "Caught exception: ", get_class($e), ": ", $e->getMessage(), PHP_EOL;
break;
}
$i++;
}

if($exit_code == 1) {
echo "FAIL! Exception should be raised when the test setup works correctly", PHP_EOL;
} else {
echo "SUCCESS! Exception was raised.", PHP_EOL;
}
echo "Exiting with exit code: ", $exit_code, PHP_EOL;
exit($exit_code);
5 changes: 5 additions & 0 deletions test/integration-php/toxiproxy-cli/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM alpine:latest
COPY --from=ghcr.io/shopify/toxiproxy:latest ./toxiproxy-cli /toxiproxy-cli
COPY ./entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
15 changes: 15 additions & 0 deletions test/integration-php/toxiproxy-cli/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/sh

set -x

/toxiproxy-cli toxic \
add \
--type reset_peer \
--attribute timeout=2000 \
--toxicName reset_peer between-proxy-and-broker

echo "Toxic added, sleeping..."

# We sleep so the container keeps running as otherwise all containers would stop
# as we are using --exit-code-from (implies --abort-on-container-exit)
sleep 900
8 changes: 8 additions & 0 deletions test/integration-php/toxiproxy.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[
{
"name": "between-proxy-and-broker",
"listen": "0.0.0.0:7777",
"upstream": "lavinmq:5672",
"enabled": true
}
]

0 comments on commit fa15183

Please sign in to comment.