Skip to content

Commit

Permalink
Bugfix: RemoteFileSyncer handling of error responses (#3440)
Browse files Browse the repository at this point in the history
  • Loading branch information
Net-burst authored Sep 13, 2024
1 parent bfe348b commit a69e3be
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 16 deletions.
31 changes: 25 additions & 6 deletions src/main/java/org/prebid/server/execution/RemoteFileSyncer.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.prebid.server.execution;

import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -69,12 +70,14 @@ public RemoteFileSyncer(RemoteFileProcessor processor,
getFileRequestOptions = new RequestOptions()
.setMethod(HttpMethod.GET)
.setTimeout(timeout)
.setAbsoluteURI(downloadUrl);
.setAbsoluteURI(downloadUrl)
.setFollowRedirects(true);

isUpdateRequiredRequestOptions = new RequestOptions()
.setMethod(HttpMethod.HEAD)
.setTimeout(timeout)
.setAbsoluteURI(downloadUrl);
.setAbsoluteURI(downloadUrl)
.setFollowRedirects(true);
}

private static void createAndCheckWritePermissionsFor(FileSystem fileSystem, String filePath) {
Expand Down Expand Up @@ -112,8 +115,7 @@ private Future<Void> deleteFile(String filePath) {
private Future<Void> syncRemoteFile(RetryPolicy retryPolicy) {
return fileSystem.open(tmpFilePath, new OpenOptions())

.compose(tmpFile -> httpClient.request(getFileRequestOptions)
.compose(HttpClientRequest::send)
.compose(tmpFile -> sendHttpRequest(getFileRequestOptions)
.compose(response -> response.pipeTo(tmpFile))
.onComplete(result -> tmpFile.close()))

Expand Down Expand Up @@ -148,8 +150,7 @@ private void setUpDeferredUpdate() {
}

private void updateIfNeeded() {
httpClient.request(isUpdateRequiredRequestOptions)
.compose(HttpClientRequest::send)
sendHttpRequest(isUpdateRequiredRequestOptions)
.compose(response -> fileSystem.exists(saveFilePath)
.compose(exists -> exists
? isLengthChanged(response)
Expand All @@ -161,6 +162,24 @@ private void updateIfNeeded() {
});
}

private Future<HttpClientResponse> sendHttpRequest(RequestOptions requestOptions) {
return httpClient.request(requestOptions)
.compose(HttpClientRequest::send)
.compose(this::validateResponse);
}

private Future<HttpClientResponse> validateResponse(HttpClientResponse response) {
final int statusCode = response.statusCode();
if (statusCode != HttpResponseStatus.OK.code()) {
return Future.failedFuture(new PreBidException(
String.format("Got unexpected response from server with status code %s and message %s",
statusCode,
response.statusMessage())));
} else {
return Future.succeededFuture(response);
}
}

private Future<Boolean> isLengthChanged(HttpClientResponse response) {
final String contentLengthParameter = response.getHeader(HttpHeaders.CONTENT_LENGTH);
return StringUtils.isNumeric(contentLengthParameter) && !contentLengthParameter.equals("0")
Expand Down
108 changes: 98 additions & 10 deletions src/test/java/org/prebid/server/execution/RemoteFileSyncerTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.prebid.server.execution;

import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -182,6 +183,8 @@ public void syncForFilepathShouldNotUpdateWhenHeadRequestReturnInvalidHead() {
.willReturn(Future.succeededFuture(httpClientRequest));
given(httpClientRequest.send())
.willReturn(Future.succeededFuture(httpClientResponse));
given(httpClientResponse.statusCode())
.willReturn(HttpResponseStatus.OK.code());
given(httpClientResponse.getHeader(HttpHeaders.CONTENT_LENGTH))
.willReturn("notnumber");

Expand Down Expand Up @@ -209,7 +212,10 @@ public void syncForFilepathShouldNotUpdateWhenPropsIsFailed() {
.willReturn(Future.succeededFuture(httpClientRequest));
given(httpClientRequest.send())
.willReturn(Future.succeededFuture(httpClientResponse));
given(httpClientResponse.getHeader(any(CharSequence.class))).willReturn(FILE_SIZE.toString());
given(httpClientResponse.statusCode())
.willReturn(HttpResponseStatus.OK.code());
given(httpClientResponse.getHeader(any(CharSequence.class)))
.willReturn(FILE_SIZE.toString());

given(fileSystem.props(anyString()))
.willReturn(Future.failedFuture(new IllegalArgumentException("ERROR")));
Expand Down Expand Up @@ -240,7 +246,10 @@ public void syncForFilepathShouldNotUpdateServiceWhenSizeEqualsContentLength() {
.willReturn(Future.succeededFuture(httpClientRequest));
given(httpClientRequest.send())
.willReturn(Future.succeededFuture(httpClientResponse));
given(httpClientResponse.getHeader(any(CharSequence.class))).willReturn(FILE_SIZE.toString());
given(httpClientResponse.statusCode())
.willReturn(HttpResponseStatus.OK.code());
given(httpClientResponse.getHeader(any(CharSequence.class)))
.willReturn(FILE_SIZE.toString());

given(fileSystem.props(anyString()))
.willReturn(Future.succeededFuture(fileProps));
Expand Down Expand Up @@ -274,8 +283,12 @@ public void syncForFilepathShouldUpdateServiceWhenSizeNotEqualsContentLength() {
.willReturn(Future.succeededFuture(httpClientRequest));
given(httpClientRequest.send())
.willReturn(Future.succeededFuture(httpClientResponse));
given(httpClientResponse.pipeTo(any())).willReturn(Future.succeededFuture());
given(httpClientResponse.getHeader(any(CharSequence.class))).willReturn(FILE_SIZE.toString());
given(httpClientResponse.pipeTo(any()))
.willReturn(Future.succeededFuture());
given(httpClientResponse.statusCode())
.willReturn(HttpResponseStatus.OK.code());
given(httpClientResponse.getHeader(any(CharSequence.class)))
.willReturn(FILE_SIZE.toString());

given(fileSystem.props(anyString()))
.willReturn(Future.succeededFuture(fileProps));
Expand All @@ -291,7 +304,8 @@ public void syncForFilepathShouldUpdateServiceWhenSizeNotEqualsContentLength() {
given(fileSystem.move(anyString(), any(), any(CopyOptions.class)))
.willReturn(Future.succeededFuture());

given(remoteFileProcessor.setDataPath(anyString())).willReturn(Future.succeededFuture());
given(remoteFileProcessor.setDataPath(anyString()))
.willReturn(Future.succeededFuture());

// when
remoteFileSyncer.sync();
Expand Down Expand Up @@ -354,7 +368,8 @@ public void syncForFilepathShouldRetryWhenFileOpeningFailed() {
.willAnswer(withSelfAndPassObjectToHandler(Future.succeededFuture()))
.willAnswer(withSelfAndPassObjectToHandler(Future.failedFuture(new RuntimeException())));

given(remoteFileProcessor.setDataPath(anyString())).willReturn(Future.succeededFuture());
given(remoteFileProcessor.setDataPath(anyString()))
.willReturn(Future.succeededFuture());

// when
remoteFileSyncer.sync();
Expand All @@ -370,7 +385,8 @@ public void syncForFilepathShouldRetryWhenFileOpeningFailed() {
@Test
public void syncForFilepathShouldDownloadFilesAndNotUpdateWhenUpdatePeriodIsNotSet() {
// given
given(remoteFileProcessor.setDataPath(anyString())).willReturn(Future.succeededFuture());
given(remoteFileProcessor.setDataPath(anyString()))
.willReturn(Future.succeededFuture());

given(fileSystem.exists(anyString()))
.willReturn(Future.succeededFuture(false));
Expand All @@ -382,6 +398,8 @@ public void syncForFilepathShouldDownloadFilesAndNotUpdateWhenUpdatePeriodIsNotS
.willReturn(Future.succeededFuture(httpClientRequest));
given(httpClientRequest.send())
.willReturn(Future.succeededFuture(httpClientResponse));
given(httpClientResponse.statusCode())
.willReturn(HttpResponseStatus.OK.code());
given(httpClientResponse.pipeTo(asyncFile))
.willReturn(Future.succeededFuture());

Expand All @@ -395,6 +413,7 @@ public void syncForFilepathShouldDownloadFilesAndNotUpdateWhenUpdatePeriodIsNotS
verify(fileSystem).open(eq(TMP_FILE_PATH), any());
verify(httpClient).request(any());
verify(asyncFile).close();
verify(httpClientResponse).statusCode();
verify(remoteFileProcessor).setDataPath(any());
verify(fileSystem).move(eq(TMP_FILE_PATH), eq(FILE_PATH), any(CopyOptions.class));
verify(vertx, never()).setTimer(eq(UPDATE_INTERVAL), any());
Expand All @@ -419,8 +438,6 @@ public void syncForFilepathShouldRetryWhenTimeoutIsReached() {
given(httpClient.request(any()))
.willReturn(Future.succeededFuture(httpClientRequest));
given(httpClientRequest.send())
.willReturn(Future.succeededFuture(httpClientResponse));
given(httpClientResponse.pipeTo(asyncFile))
.willReturn(Future.failedFuture("Timeout"));

// when
Expand All @@ -429,6 +446,7 @@ public void syncForFilepathShouldRetryWhenTimeoutIsReached() {
// then
verify(vertx, times(RETRY_COUNT)).setTimer(eq(RETRY_INTERVAL), any());
verify(fileSystem, times(RETRY_COUNT + 1)).open(eq(TMP_FILE_PATH), any());
verify(httpClientResponse, never()).pipeTo(any());

// Response handled
verify(httpClient, times(RETRY_COUNT + 1)).request(any());
Expand All @@ -437,11 +455,81 @@ public void syncForFilepathShouldRetryWhenTimeoutIsReached() {
verifyNoInteractions(remoteFileProcessor);
}

@Test
public void syncShouldNotSaveFileWhenServerRespondsWithNonOkStatusCode() {
// given
given(fileSystem.exists(anyString()))
.willReturn(Future.succeededFuture(false));
given(fileSystem.open(any(), any()))
.willReturn(Future.succeededFuture(asyncFile));
given(fileSystem.move(anyString(), anyString(), any(CopyOptions.class)))
.willReturn(Future.succeededFuture());

given(httpClient.request(any()))
.willReturn(Future.succeededFuture(httpClientRequest));
given(httpClientRequest.send())
.willReturn(Future.succeededFuture(httpClientResponse));
given(httpClientResponse.statusCode())
.willReturn(0);

// when
remoteFileSyncer.sync();

// then
verify(fileSystem, times(1)).exists(eq(FILE_PATH));
verify(fileSystem).open(eq(TMP_FILE_PATH), any());
verify(fileSystem).delete(eq(TMP_FILE_PATH));
verify(asyncFile).close();
verify(fileSystem, never()).move(eq(TMP_FILE_PATH), eq(FILE_PATH), any(CopyOptions.class));
verify(httpClient).request(any());
verify(httpClientResponse).statusCode();
verify(httpClientResponse, never()).pipeTo(any());
verify(remoteFileProcessor, never()).setDataPath(any());
verify(vertx, never()).setTimer(eq(UPDATE_INTERVAL), any());
}

@Test
public void syncShouldNotUpdateFileWhenServerRespondsWithNonOkStatusCode() {
// given
remoteFileSyncer = new RemoteFileSyncer(
remoteFileProcessor, SOURCE_URL, FILE_PATH, TMP_FILE_PATH, RETRY_POLICY,
TIMEOUT, UPDATE_INTERVAL, httpClient, vertx);

givenTriggerUpdate();

given(fileSystem.open(any(), any()))
.willReturn(Future.succeededFuture(asyncFile));
given(fileSystem.move(anyString(), anyString(), any(CopyOptions.class)))
.willReturn(Future.succeededFuture());

given(httpClient.request(any()))
.willReturn(Future.succeededFuture(httpClientRequest));
given(httpClientRequest.send())
.willReturn(Future.succeededFuture(httpClientResponse));
given(httpClientResponse.statusCode())
.willReturn(0);

// when
remoteFileSyncer.sync();

// then
verify(fileSystem, times(1)).exists(eq(FILE_PATH));
verify(fileSystem, never()).open(any(), any());
verify(fileSystem, never()).delete(any());
verify(fileSystem, never()).move(any(), any(), any(), any());
verify(asyncFile, never()).close();
verify(httpClient, times(1)).request(any());
verify(httpClientResponse).statusCode();
verify(httpClientResponse, never()).pipeTo(any());
verify(vertx).setPeriodic(eq(UPDATE_INTERVAL), any());
}

private void givenTriggerUpdate() {
given(fileSystem.exists(anyString()))
.willReturn(Future.succeededFuture(true));

given(remoteFileProcessor.setDataPath(anyString())).willReturn(Future.succeededFuture());
given(remoteFileProcessor.setDataPath(anyString()))
.willReturn(Future.succeededFuture());

given(vertx.setPeriodic(eq(UPDATE_INTERVAL), any()))
.willAnswer(withReturnObjectAndPassObjectToHandler(123L, 123L, 1))
Expand Down

0 comments on commit a69e3be

Please sign in to comment.