Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disconnect clients on broken upstream connection #128

Merged
merged 7 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ on:
- 'shard.lock'
- 'src/**'
- 'spec/**'
- 'test/**'

jobs:
spec:
Expand Down Expand Up @@ -45,3 +46,12 @@ jobs:
steps:
- uses: actions/checkout@v3
- run: crystal tool format --check

integration:
runs-on: ubuntu-latest
timeout-minutes: 10
steps:
- name: Checkout
uses: actions/checkout@v4
- name: PHP client integration test using Docker Compose
run: ./test/integration-php.sh
4 changes: 0 additions & 4 deletions src/amqproxy/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ module AMQProxy
close = AMQ::Protocol::Frame::Connection::Close.new(403_u16, "UPSTREAM_ERROR", 0_u16, 0_u16)
close.to_io socket, IO::ByteFormat::NetworkEndian
socket.flush
rescue ex : IO::Error
@log.error { "IO Error for user '#{user}' to vhost '#{vhost}': #{ex.message}" }
close = AMQ::Protocol::Frame::Connection::Close.new(403_u16, "IO_ERROR", 0_u16, 0_u16)
close.to_io socket, IO::ByteFormat::NetworkEndian
end
rescue ex : Client::Error
@log.debug { "Client disconnected: #{remote_address}: #{ex.inspect}" }
Expand Down
4 changes: 3 additions & 1 deletion src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ module AMQProxy
# Frames from upstream (to client)
def read_loop # ameba:disable Metrics/CyclomaticComplexity
socket = @socket
client = @current_client
loop do
AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) do |frame|
case frame
Expand All @@ -55,7 +56,7 @@ module AMQProxy
write frame
next
end
if client = @current_client
if client
begin
client.write(frame)
rescue ex
Expand Down Expand Up @@ -84,6 +85,7 @@ module AMQProxy
@log.error "Error reading from upstream: #{ex.inspect_with_backtrace}" unless @socket.closed?
ensure
@socket.close unless @socket.closed?
client.close_socket if client
end

SAFE_BASIC_METHODS = {40, 10} # qos and publish
Expand Down
16 changes: 16 additions & 0 deletions test/integration-php.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

set -x
set -e

# --force-recreate and --renew-anon-volumes needed to start fresh every time
# otherwise broker datadir volume may be re-used

docker compose \
--file test/integration-php/docker-compose.yml \
up \
--remove-orphans \
--force-recreate \
--renew-anon-volumes \
--build \
--exit-code-from php-amqp
57 changes: 57 additions & 0 deletions test/integration-php/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
version: "3.8"
configs:
toxiproxy_config:
file: ./toxiproxy.json
services:
amqproxy:
build: ../../
expose:
- "5673"
entrypoint: ["amqproxy", "--debug", "--listen=0.0.0.0", "amqp://toxiproxy:7777"]
depends_on:
toxiproxy:
condition: service_started

toxiproxy:
image: ghcr.io/shopify/toxiproxy:latest
expose:
- "8474" # Toxiproxy HTTP API
- "7777" # Expose AMQP broker on this port
command: ["-host=0.0.0.0", "-config=/toxiproxy_config"]
configs:
- toxiproxy_config
depends_on:
rabbitmq:
condition: service_healthy

rabbitmq:
image: rabbitmq:latest
expose:
- "5672"
healthcheck:
test: ["CMD-SHELL", "rabbitmqctl status"]
interval: 2s
retries: 10
start_period: 20s

php-amqp:
build: ./php-amqp
environment:
TEST_RABBITMQ_HOST: amqproxy
TEST_RABBITMQ_PORT: 5673
# Makes this container exit with error if the test script hangs (timeout)
command: ["timeout", "15s", "php", "-d", "extension=amqp.so", "get-test.php"]
depends_on:
amqproxy:
condition: service_started

toxiproxy-cli:
# Need to build our own image as the toxiproxy image has no shell
build: ./toxiproxy-cli
environment:
TOXIPROXY_URL: "http://toxiproxy:8474"
# The PHP client needs to start and connect before we add the toxiproxy toxic
# that will break the amqproxy upstream connection, depends_on makes this happen
depends_on:
php-amqp:
condition: service_started
21 changes: 21 additions & 0 deletions test/integration-php/php-amqp/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
FROM php:8.2-cli
WORKDIR /app

RUN apt-get update -q \
&& apt-get install -qq cmake libssl-dev git unzip \
&& rm -rf /var/lib/apt/lists/*

# Install librabbitmq (https://github.com/alanxz/rabbitmq-c)
RUN \
curl --location --silent --output /tmp/rabbitmq-c.tar.gz https://github.com/alanxz/rabbitmq-c/archive/v0.10.0.tar.gz \
&& mkdir -p /tmp/rabbitmq-c/build \
&& tar --gunzip --extract --strip-components 1 --directory /tmp/rabbitmq-c --file /tmp/rabbitmq-c.tar.gz \
&& cd /tmp/rabbitmq-c/build \
&& cmake -DBUILD_EXAMPLES=OFF -DBUILD_TESTS=OFF -DBUILD_TOOLS=OFF -DENABLE_SSL_SUPPORT=ON .. \
&& cmake --build . --target install \
&& ln -s /usr/local/lib/x86_64-linux-gnu/librabbitmq.so.4 /usr/local/lib/

# Install php-amqp (https://github.com/php-amqp/php-amqp)
RUN echo /usr/local | pecl install amqp

COPY *.php ./
51 changes: 51 additions & 0 deletions test/integration-php/php-amqp/get-test.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php

define("HOST", getenv("TEST_RABBITMQ_HOST") ?: "localhost");
define("PORT", getenv("TEST_RABBITMQ_PORT") ?: "5672");
define("USER", getenv("TEST_RABBITMQ_USER") ?: "guest");
define("PASS", getenv("TEST_RABBITMQ_PASS") ?: "guest");
define("ATTEMPTS", getenv("TEST_ATTEMPTS") ?: 10);

$connection = new AMQPConnection();
$connection->setHost(HOST);
$connection->setPort(PORT);
$connection->setLogin(USER);
$connection->setPassword(PASS);
$connection->connect();

$channel = new AMQPChannel($connection);

$exchange_name = "test-ex";
$exchange = new AMQPExchange($channel);
$exchange->setType(AMQP_EX_TYPE_FANOUT);
$exchange->setName($exchange_name);
$exchange->declareExchange();

$queue = new AMQPQueue($channel);
$queue->setName("test-q");
$queue->declareQueue();
$queue->bind($exchange_name,$queue->getName());

$i = 1;
$exit_code = 1; // exception should be raised if the test setup works correctly

while ($i <= ATTEMPTS) {
echo "Getting messages, attempt #", $i, PHP_EOL;
try {
$queue->get(AMQP_AUTOACK);
sleep(1);
} catch(Exception $e) {
$exit_code = 0;
echo "Caught exception: ", get_class($e), ": ", $e->getMessage(), PHP_EOL;
break;
}
$i++;
}

if($exit_code == 1) {
echo "FAIL! Exception should be raised when the test setup works correctly", PHP_EOL;
} else {
echo "SUCCESS! Exception was raised.", PHP_EOL;
}
echo "Exiting with exit code: ", $exit_code, PHP_EOL;
exit($exit_code);
5 changes: 5 additions & 0 deletions test/integration-php/toxiproxy-cli/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM alpine:latest
COPY --from=ghcr.io/shopify/toxiproxy:latest ./toxiproxy-cli /toxiproxy-cli
COPY ./entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
15 changes: 15 additions & 0 deletions test/integration-php/toxiproxy-cli/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/sh

set -x

/toxiproxy-cli toxic \
add \
--type reset_peer \
--attribute timeout=2000 \
--toxicName reset_peer between-proxy-and-broker

echo "Toxic added, sleeping..."

# We sleep so the container keeps running as otherwise all containers would stop
# as we are using --exit-code-from (implies --abort-on-container-exit)
sleep 900
8 changes: 8 additions & 0 deletions test/integration-php/toxiproxy.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[
{
"name": "between-proxy-and-broker",
"listen": "0.0.0.0:7777",
"upstream": "rabbitmq:5672",
"enabled": true
}
]