Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DefaultReactiveElasticsearchClient handle 5xx error with empty body. #1713

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,10 @@ private <T> Publisher<? extends T> handleServerError(Request request, ClientResp
String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType());

return response.body(BodyExtractors.toMono(byte[].class)) //
.switchIfEmpty(Mono
.error(new ElasticsearchStatusException(String.format("%s request to %s returned error code %s and no body.",
request.getMethod(), request.getEndpoint(), statusCode), status))
)
.map(bytes -> new String(bytes, StandardCharsets.UTF_8)) //
.flatMap(content -> contentOrError(content, mediaType, status))
.flatMap(unused -> Mono
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,29 @@
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.net.URI;
import java.util.Optional;
import java.util.function.Function;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.http.HttpStatus;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
import org.springframework.web.util.UriBuilder;

/**
* @author Peter-Josef Meisch
Expand All @@ -46,30 +55,24 @@ class DefaultReactiveElasticsearchClientTest {
@Mock private HostProvider hostProvider;

@Mock private Function<SearchRequest, Request> searchRequestConverter;
@Spy private RequestCreator requestCreator;

private DefaultReactiveElasticsearchClient client;

@BeforeEach
void setUp() {
client = new DefaultReactiveElasticsearchClient(hostProvider, new RequestCreator() {
@Override
public Function<SearchRequest, Request> search() {
return searchRequestConverter;
}
}) {
@Override
public Mono<ResponseSpec> execute(ReactiveElasticsearchClientCallback callback) {
return Mono.empty();
}
};
}
@Mock private WebClient webClient;

@Test
void shouldSetAppropriateRequestParametersOnCount() {

when(requestCreator.search()).thenReturn(searchRequestConverter);
SearchRequest searchRequest = new SearchRequest("someindex") //
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()));

ReactiveElasticsearchClient client = new DefaultReactiveElasticsearchClient(hostProvider, requestCreator) {
@Override
public Mono<ResponseSpec> execute(ReactiveElasticsearchClientCallback callback) {
return Mono.empty();
}
};

client.count(searchRequest).as(StepVerifier::create).verifyComplete();

ArgumentCaptor<SearchRequest> captor = ArgumentCaptor.forClass(SearchRequest.class);
Expand All @@ -79,4 +82,33 @@ void shouldSetAppropriateRequestParametersOnCount() {
assertThat(source.trackTotalHitsUpTo()).isEqualTo(TRACK_TOTAL_HITS_ACCURATE);
assertThat(source.fetchSource()).isEqualTo(FetchSourceContext.DO_NOT_FETCH_SOURCE);
}

@Test // #1712
@DisplayName("should throw ElasticsearchStatusException on server 5xx with empty body")
void shouldThrowElasticsearchStatusExceptionOnServer5xxWithEmptyBody() {

when(hostProvider.getActive(any())).thenReturn(Mono.just(webClient));
WebClient.RequestBodyUriSpec requestBodyUriSpec = mock(WebClient.RequestBodyUriSpec.class);
when(requestBodyUriSpec.uri((Function<UriBuilder, URI>) any())).thenReturn(requestBodyUriSpec);
when(requestBodyUriSpec.attribute(any(), any())).thenReturn(requestBodyUriSpec);
when(requestBodyUriSpec.headers(any())).thenReturn(requestBodyUriSpec);
when(webClient.method(any())).thenReturn(requestBodyUriSpec);
when(requestBodyUriSpec.exchangeToMono(any())).thenAnswer(invocationOnMock -> {
Function<ClientResponse, ? extends Mono<?>> responseHandler = invocationOnMock.getArgument(0);
ClientResponse clientResponse = mock(ClientResponse.class);
when(clientResponse.statusCode()).thenReturn(HttpStatus.SERVICE_UNAVAILABLE);
ClientResponse.Headers headers = mock(ClientResponse.Headers.class);
when(headers.contentType()).thenReturn(Optional.empty());
when(clientResponse.headers()).thenReturn(headers);
when(clientResponse.body(any())).thenReturn(Mono.empty());
return responseHandler.apply(clientResponse);
});

ReactiveElasticsearchClient client = new DefaultReactiveElasticsearchClient(hostProvider, requestCreator);

client.get(new GetRequest("42")) //
.as(StepVerifier::create) //
.expectError(ElasticsearchStatusException.class) //
.verify(); //
}
}