Skip to content

Commit

Permalink
Update current client in Upstream#read_loop (#138)
Browse files Browse the repository at this point in the history
Fix regression introduced by #128

Close #135
  • Loading branch information
dentarg authored Oct 20, 2023
1 parent 8462a50 commit 1abcc51
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 18 deletions.
19 changes: 5 additions & 14 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ name: CI
on:
push:
paths:
- 'run-specs-in-docker.sh'
- '.github/workflows/ci.yml'
- 'shard.yml'
- 'shard.lock'
Expand All @@ -12,28 +13,17 @@ on:
jobs:
spec:
runs-on: ubuntu-latest
container: 84codes/crystal:latest-ubuntu-22.04
timeout-minutes: 10
steps:
- name: Install RabbitMQ
run: apt-get update && apt-get install -y rabbitmq-server

- name: Start RabbitMQ
run: RABBITMQ_PID_FILE=/tmp/rabbitmq.pid rabbitmq-server -detached

- name: Checkout
uses: actions/checkout@v4

- name: Install shards
run: sed -i '/ameba/d' shard.yml && shards install

- name: Wait for RabbitMQ to start
run: rabbitmqctl wait /tmp/rabbitmq.pid

- name: Run tests
run: crystal spec --order random
run: ./run-specs-in-docker.sh

lint:
runs-on: ubuntu-latest
timeout-minutes: 10
container: 84codes/crystal:latest-ubuntu-22.04
steps:
- uses: actions/checkout@v4
Expand All @@ -42,6 +32,7 @@ jobs:

format:
runs-on: ubuntu-latest
timeout-minutes: 10
container: 84codes/crystal:latest-ubuntu-22.04
steps:
- uses: actions/checkout@v4
Expand Down
21 changes: 20 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,25 @@
# Contributing

_TODO_
## Development

Run tests in Docker:

```bash
docker build . -f spec/Dockerfile -t amqproxy_spec
docker run --rm -it -v $(pwd):/app -w /app --entrypoint bash amqproxy_spec

# ensure rabbitmq is up, run all specs
./entrypoint.sh

# run single spec
crystal spec --example "keeps connections open"
```

Run tests using Docker Compose:

```bash
./run-specs-in-docker.sh
```

## Release

Expand Down
13 changes: 13 additions & 0 deletions run-specs-in-docker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

set -x
set -e

docker compose \
--file spec/docker-compose.yml \
up \
--remove-orphans \
--force-recreate \
--renew-anon-volumes \
--build \
--exit-code-from spec
16 changes: 16 additions & 0 deletions spec/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM 84codes/crystal:latest-ubuntu-22.04

RUN apt-get update && apt-get install -y rabbitmq-server

WORKDIR /tmp

# We want to install shards before copying code/spec files for quicker runs
COPY shard.yml shard.lock ./
RUN shards install

COPY src/ src/
COPY spec/ spec/

COPY spec/entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
38 changes: 38 additions & 0 deletions spec/amqproxy_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,44 @@ describe AMQProxy::Server do
end
end

it "publish and consume works" do
server = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
begin
spawn { server.listen("127.0.0.1", 5673) }
Fiber.yield

queue_name = "amqproxy-test-queue"
num_received_messages = 0
num_messages_to_publish = 5

num_messages_to_publish.times do
AMQP::Client.start("amqp://localhost:5673") do |conn|
channel = conn.channel
queue = channel.queue(queue_name)
queue.publish_confirm("Message from AMQProxy specs")
end
sleep 0.1
end

AMQP::Client.start("amqp://localhost:5673") do |conn|
channel = conn.channel
channel.basic_consume(queue_name, tag: "AMQProxy specs") do |msg|
body = msg.body_io.to_s
if body == "Message from AMQProxy specs"
# FIXME: ack:ing causes this bug
# https://github.com/cloudamqp/amqproxy/issues/137
# channel.basic_ack(msg.delivery_tag)
num_received_messages += 1
end
end
end

num_received_messages.should eq num_messages_to_publish
ensure
server.stop_accepting_clients
end
end

it "can reconnect if upstream closes" do
s = AMQProxy::Server.new("127.0.0.1", 5672, false, Logger::DEBUG)
begin
Expand Down
6 changes: 6 additions & 0 deletions spec/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
version: "3.8"
services:
spec:
build:
context: ..
dockerfile: spec/Dockerfile
16 changes: 16 additions & 0 deletions spec/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

set -e
set -u
set -x

export RABBITMQ_PID_FILE=/tmp/rabbitmq.pid

# Start RabbitMQ
rabbitmq-server -detached

# Wait for RabbitMQ to start
rabbitmqctl wait $RABBITMQ_PID_FILE

crystal --version
crystal spec --order random
30 changes: 30 additions & 0 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,33 @@ require "../src/amqproxy/version"
require "amqp-client"

MAYBE_SUDO = (ENV.has_key?("NO_SUDO") || `id -u` == "0\n") ? "" : "sudo "

# Spec timeout borrowed from Crystal project:
# https://github.com/crystal-lang/crystal/blob/1.10.1/spec/support/mt_abort_timeout.cr

private SPEC_TIMEOUT = 15.seconds

Spec.around_each do |example|
done = Channel(Exception?).new

spawn(same_thread: true) do
begin
example.run
rescue e
done.send(e)
else
done.send(nil)
end
end

timeout = SPEC_TIMEOUT

select
when res = done.receive
raise res if res
when timeout(timeout)
_it = example.example
ex = Spec::AssertionFailed.new("spec timed out after #{timeout}", _it.file, _it.line)
_it.parent.report(:fail, _it.description, _it.file, _it.line, timeout, ex)
end
end
5 changes: 2 additions & 3 deletions src/amqproxy/upstream.cr
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ module AMQProxy
# Frames from upstream (to client)
def read_loop # ameba:disable Metrics/CyclomaticComplexity
socket = @socket
client = @current_client
loop do
AMQ::Protocol::Frame.from_io(socket, IO::ByteFormat::NetworkEndian) do |frame|
case frame
Expand All @@ -56,7 +55,7 @@ module AMQProxy
write frame
next
end
if client
if client = @current_client
begin
client.write(frame)
rescue ex
Expand Down Expand Up @@ -85,7 +84,7 @@ module AMQProxy
@log.error "Error reading from upstream: #{ex.inspect_with_backtrace}" unless @socket.closed?
ensure
@socket.close unless @socket.closed?
client.close_socket if client
@current_client.try &.close_socket
end

SAFE_BASIC_METHODS = {40, 10} # qos and publish
Expand Down

0 comments on commit 1abcc51

Please sign in to comment.