Skip to content

Commit

Permalink
Polish queue delete with parameters
Browse files Browse the repository at this point in the history
References rabbitmq#217
  • Loading branch information
acogoluegnes committed Apr 9, 2020
1 parent a324573 commit a27d589
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/main/java/com/rabbitmq/http/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -1198,7 +1198,7 @@ private URI uriWithPath(final String path) {
}

private URI uriWithPath(final String path, final Map<String, String> queryParams) {
LinkedMultiValueMap<String, String> map = new LinkedMultiValueMap<>();
MultiValueMap<String, String> map = new LinkedMultiValueMap<>();
queryParams.entrySet()
.forEach(e -> map.add(e.getKey(), e.getValue()));

Expand Down
12 changes: 7 additions & 5 deletions src/main/java/com/rabbitmq/http/client/ReactorNettyClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import org.reactivestreams.Publisher;
import org.springframework.util.MultiValueMap;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -874,13 +873,16 @@ private Mono<HttpResponse> doPut(String... pathSegments) {
}

private Mono<HttpResponse> doDelete(Consumer<? super HttpHeaders> headerBuilder, Map<String, String> queryParams, String... pathSegments) {
String query = queryParams.entrySet().stream()
.map(e -> String.format("%s=%s", e.getKey(), enc(e.getValue())))
.collect(Collectors.joining("&", "?", ""));
String uri = uri(pathSegments);
if (queryParams != null && !queryParams.isEmpty()) {
uri += queryParams.entrySet().stream()
.map(e -> String.format("%s=%s", e.getKey(), enc(e.getValue())))
.collect(Collectors.joining("&", "?", ""));
}
return client.headersWhen(authorizedHeader())
.headers(headerBuilder)
.delete()
.uri(uri(pathSegments) + query)
.uri(uri)
.response()
.doOnNext(applyResponseCallback())
.map(ReactorNettyClient::toHttpResponse);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,8 +20,8 @@
import java.util.Map;

public class DeleteQueueParameters {
private boolean ifEmpty;
private boolean ifUnused;
private final boolean ifEmpty;
private final boolean ifUnused;

public DeleteQueueParameters(boolean ifEmpty, boolean ifUnused) {
this.ifEmpty = ifEmpty;
Expand All @@ -39,10 +39,10 @@ public boolean isIfUnused() {
public Map<String, String> getAsQueryParams() {
Map<String, String> params = new LinkedHashMap<>();
if (ifEmpty) {
params.put("if-empty", "true");
params.put("if-empty", Boolean.TRUE.toString());
}
if (ifUnused) {
params.put("if-unused", "true");
params.put("if-unused", Boolean.TRUE.toString());
}
return params;
}
Expand Down
17 changes: 10 additions & 7 deletions src/test/groovy/com/rabbitmq/http/client/ClientSpec.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -907,26 +907,29 @@ class ClientSpec extends Specification {

@Unroll
def "DELETE /api/queues/{vhost}/{name}?if-empty=true"() {
final String s = UUID.randomUUID().toString()
given: "queue ${s} in vhost /"
final String queue = UUID.randomUUID().toString()
given: "queue ${queue} in vhost /"
final v = "/"
client.declareQueue(v, s, new QueueInfo(false, false, false))
client.declareQueue(v, queue, new QueueInfo(false, false, false))

List<QueueInfo> xs = client.getQueues(v)
QueueInfo x = xs.find { (it.name == s) }
QueueInfo x = xs.find { (it.name == queue) }
x != null
verifyQueueInfo(x)

and: "queue has a message"
client.publish(v, "amq.default", s, new OutboundMessage().payload("test"))
client.publish(v, "amq.default", queue, new OutboundMessage().payload("test"))

when: "client tries to delete queue ${s} in vhost /"
client.deleteQueue(v, s, new DeleteQueueParameters(true, false))
when: "client tries to delete queue ${queue} in vhost /"
client.deleteQueue(v, queue, new DeleteQueueParameters(true, false))

then: "an exception is thrown"
final e = thrown(HttpClientErrorException)
e.getStatusCode() == HttpStatus.BAD_REQUEST

cleanup:
client.deleteQueue(v, queue)

where:
client << clients()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2019 the original author or authors.
* Copyright 2018-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,6 +61,7 @@ import spock.lang.Specification
import spock.lang.Unroll

import java.nio.charset.Charset
import java.time.Duration
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
Expand Down Expand Up @@ -1288,27 +1289,30 @@ class ReactorNettyClientSpec extends Specification {
}

def "DELETE /api/queues/{vhost}/{name}?if-empty=true"() {
final String s = UUID.randomUUID().toString()
given: "queue ${s} in vhost /"
final String queue = UUID.randomUUID().toString()
given: "queue ${queue} in vhost /"
final v = "/"
client.declareQueue(v, s, new QueueInfo(false, false, false)).block()
client.declareQueue(v, queue, new QueueInfo(false, false, false)).block()

Flux<QueueInfo> xs = client.getQueues(v)
QueueInfo x = xs.filter( { q -> q.name.equals(s) } ).blockFirst()
QueueInfo x = xs.filter( { q -> q.name.equals(queue) } ).blockFirst()
x != null
verifyQueueInfo(x)

and: "queue has a message"
client.publish(v, "amq.default", s, new OutboundMessage().payload("test")).block()
client.publish(v, "amq.default", queue, new OutboundMessage().payload("test")).block()

when: "client tries to delete queue ${s} in vhost /"
def status = client.deleteQueue(v, s, new DeleteQueueParameters(true, false))
when: "client tries to delete queue ${queue} in vhost /"
def status = client.deleteQueue(v, queue, new DeleteQueueParameters(true, false))
.flatMap({ r -> Mono.just(r.status) })
.onErrorReturn({ t -> "Connection prematurely closed BEFORE response".equals(t.getMessage()) }, 500)
.block()

then: "HTTP status is 400 BAD REQUEST"
status == 400

cleanup:
client.deleteQueue(v, queue).block(Duration.ofSeconds(10))
}

def "GET /api/bindings"() {
Expand Down

0 comments on commit a27d589

Please sign in to comment.