Skip to content

Commit

Permalink
Merge branch 'master' into fpd-update
Browse files Browse the repository at this point in the history
  • Loading branch information
rpanchyk committed Mar 13, 2020
2 parents 6399f6a + 2b53488 commit d2de2e5
Show file tree
Hide file tree
Showing 38 changed files with 270 additions and 266 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>org.prebid</groupId>
<artifactId>prebid-server</artifactId>
<version>1.29.0-SNAPSHOT</version>
<version>1.30.0-SNAPSHOT</version>

<name>prebid-server</name>
<description>Prebid Server (Server-side Header Bidding)</description>
Expand All @@ -29,7 +29,7 @@
<javax.annotation-api.version>1.3.1</javax.annotation-api.version>
<validation-api.version>2.0.1.Final</validation-api.version>
<hibernate-validator.version>6.1.0.Final</hibernate-validator.version>
<vertx.version>3.7.1</vertx.version>
<vertx.version>3.8.3</vertx.version>
<lombok.version>1.18.4</lombok.version>
<commons.version>3.6</commons.version>
<commons.collections.version>4.1</commons.collections.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import com.iab.openrtb.request.Site;
import com.iab.openrtb.response.BidResponse;
import io.vertx.core.MultiMap;
import io.vertx.core.http.Cookie;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.web.Cookie;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import com.iab.openrtb.request.User;
import com.iab.openrtb.response.BidResponse;
import io.vertx.core.MultiMap;
import io.vertx.core.http.Cookie;
import io.vertx.core.http.HttpMethod;
import io.vertx.ext.web.Cookie;
import org.apache.commons.lang3.StringUtils;
import org.prebid.server.auction.model.AdUnitBid;
import org.prebid.server.auction.model.AdapterRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import com.iab.openrtb.response.BidResponse;
import com.iab.openrtb.response.SeatBid;
import io.vertx.core.MultiMap;
import io.vertx.core.http.Cookie;
import io.vertx.core.http.HttpMethod;
import io.vertx.ext.web.Cookie;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package org.prebid.server.cookie;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.Cookie;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.web.Cookie;
import io.vertx.ext.web.RoutingContext;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down
135 changes: 70 additions & 65 deletions src/main/java/org/prebid/server/execution/RemoteFileSyncer.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.file.AsyncFile;
import io.vertx.core.file.CopyOptions;
Expand Down Expand Up @@ -100,78 +101,79 @@ public void syncForFilepath(RemoteFileProcessor remoteFileProcessor) {
}

private Future<Boolean> downloadIfNotExist(RemoteFileProcessor fileProcessor) {
final Future<Boolean> future = Future.future();
final Promise<Boolean> promise = Promise.promise();
checkFileExist(saveFilePath).setHandler(existResult ->
handleFileExistingWithSync(existResult, fileProcessor, future));
return future;
handleFileExistingWithSync(existResult, fileProcessor, promise));
return promise.future();
}

private Future<Boolean> checkFileExist(String filePath) {
final Future<Boolean> result = Future.future();
final Promise<Boolean> promise = Promise.promise();
fileSystem.exists(filePath, async -> {
if (async.succeeded()) {
result.complete(async.result());
promise.complete(async.result());
} else {
result.fail(String.format("Cant check if file exists %s", filePath));
promise.fail(String.format("Cant check if file exists %s", filePath));
}
});
return result;
return promise.future();
}

private void handleFileExistingWithSync(AsyncResult<Boolean> existResult, RemoteFileProcessor fileProcessor,
Future<Boolean> future) {
Promise<Boolean> promise) {
if (existResult.succeeded()) {
if (existResult.result()) {
fileProcessor.setDataPath(saveFilePath)
.setHandler(serviceRespond -> handleServiceRespond(serviceRespond, future));
.setHandler(serviceRespond -> handleServiceRespond(serviceRespond, promise));
} else {
syncRemoteFiles().setHandler(future);
syncRemoteFiles().setHandler(promise);
}
} else {
future.fail(existResult.cause());
promise.fail(existResult.cause());
}
}

private void handleServiceRespond(AsyncResult<?> processResult, Future<Boolean> future) {
private void handleServiceRespond(AsyncResult<?> processResult, Promise<Boolean> promise) {
if (processResult.failed()) {
final Throwable cause = processResult.cause();
cleanUp(saveFilePath).setHandler(removalResult -> handleCorruptedFileRemoval(removalResult, future, cause));
cleanUp(saveFilePath).setHandler(removalResult -> handleCorruptedFileRemoval(removalResult, promise,
cause));
} else {
future.complete(false);
promise.complete(false);
logger.info("Existing file {0} was successfully reused for service", saveFilePath);
}
}

private Future<Void> cleanUp(String filePath) {
final Future<Void> future = Future.future();
checkFileExist(filePath).setHandler(existResult -> handleFileExistsWithDelete(filePath, existResult, future));
return future;
final Promise<Void> promise = Promise.promise();
checkFileExist(filePath).setHandler(existResult -> handleFileExistsWithDelete(filePath, existResult, promise));
return promise.future();
}

private void handleFileExistsWithDelete(String filePath, AsyncResult<Boolean> existResult, Future<Void> future) {
private void handleFileExistsWithDelete(String filePath, AsyncResult<Boolean> existResult, Promise<Void> promise) {
if (existResult.succeeded()) {
if (existResult.result()) {
fileSystem.delete(filePath, future);
fileSystem.delete(filePath, promise);
} else {
future.complete();
promise.complete();
}
} else {
future.fail(new PreBidException(String.format("Cant check if file exists %s", filePath)));
promise.fail(new PreBidException(String.format("Cant check if file exists %s", filePath)));
}
}

private void handleCorruptedFileRemoval(AsyncResult<Void> removalResult, Future<Boolean> future,
Throwable serviceCause) {
private void handleCorruptedFileRemoval(
AsyncResult<Void> removalResult, Promise<Boolean> promise, Throwable serviceCause) {
if (removalResult.failed()) {
final Throwable cause = removalResult.cause();
future.fail(new PreBidException(
promise.fail(new PreBidException(
String.format("Corrupted file %s cant be deleted. Please check permission or delete manually.",
saveFilePath), cause));
} else {
logger.info("Existing file {0} cant be processed by service, try to download after removal",
serviceCause, saveFilePath);

syncRemoteFiles().setHandler(future);
syncRemoteFiles().setHandler(promise);
}
}

Expand All @@ -182,107 +184,110 @@ private Future<Boolean> syncRemoteFiles() {
}

private Future<Void> tryDownload() {
final Future<Void> result = Future.future();
cleanUp(tmpFilePath).setHandler(event -> handleTmpDelete(event, result));
return result;
final Promise<Void> promise = Promise.promise();
cleanUp(tmpFilePath).setHandler(event -> handleTmpDelete(event, promise));
return promise.future();
}

private void handleTmpDelete(AsyncResult<Void> tmpDeleteResult, Future<Void> result) {
private void handleTmpDelete(AsyncResult<Void> tmpDeleteResult, Promise<Void> promise) {
if (tmpDeleteResult.failed()) {
result.fail(tmpDeleteResult.cause());
promise.fail(tmpDeleteResult.cause());
} else {
download().setHandler(downloadResult -> handleDownload(downloadResult, result));
download().setHandler(downloadResult -> handleDownload(downloadResult, promise));
}
}

private Future<Void> download() {
final Future<Void> future = Future.future();
final Promise<Void> promise = Promise.promise();
final OpenOptions openOptions = new OpenOptions().setCreateNew(true);
fileSystem.open(tmpFilePath, openOptions, openResult -> handleFileOpenWithDownload(openResult, future));
return future;
fileSystem.open(tmpFilePath, openOptions, openResult -> handleFileOpenWithDownload(openResult, promise));
return promise.future();
}

private void handleFileOpenWithDownload(AsyncResult<AsyncFile> openResult, Future<Void> future) {
private void handleFileOpenWithDownload(AsyncResult<AsyncFile> openResult, Promise<Void> promise) {
if (openResult.succeeded()) {
final AsyncFile asyncFile = openResult.result();
try {
httpClient.getAbs(downloadUrl, response -> pumpFileFromRequest(response, asyncFile, future)).end();
httpClient.getAbs(downloadUrl, response -> pumpFileFromRequest(response, asyncFile, promise)).end();
} catch (Exception e) {
future.fail(e);
promise.fail(e);
}
} else {
future.fail(openResult.cause());
promise.fail(openResult.cause());
}
}

private void pumpFileFromRequest(HttpClientResponse httpClientResponse, AsyncFile asyncFile, Future<Void> future) {
private void pumpFileFromRequest(
HttpClientResponse httpClientResponse, AsyncFile asyncFile, Promise<Void> promise) {

logger.info("Trying to download file from {0}", downloadUrl);
httpClientResponse.pause();
final Pump pump = Pump.pump(httpClientResponse, asyncFile);
pump.start();
httpClientResponse.resume();

final long idTimer = setTimeoutTimer(asyncFile, pump, future);
final long idTimer = setTimeoutTimer(asyncFile, pump, promise);

httpClientResponse.endHandler(responseEndResult -> handleResponseEnd(asyncFile, idTimer, future));
httpClientResponse.endHandler(responseEndResult -> handleResponseEnd(asyncFile, idTimer, promise));
}

private long setTimeoutTimer(AsyncFile asyncFile, Pump pump, Future<Void> future) {
return vertx.setTimer(timeout, timerId -> handleTimeout(asyncFile, pump, future));
private long setTimeoutTimer(AsyncFile asyncFile, Pump pump, Promise<Void> promise) {
return vertx.setTimer(timeout, timerId -> handleTimeout(asyncFile, pump, promise));
}

private void handleTimeout(AsyncFile asyncFile, Pump pump, Future<Void> future) {
private void handleTimeout(AsyncFile asyncFile, Pump pump, Promise<Void> promise) {
pump.stop();
asyncFile.close();
if (!future.isComplete()) {
future.fail(new TimeoutException("Timeout on download"));
if (!promise.future().isComplete()) {
promise.fail(new TimeoutException("Timeout on download"));
}
}

private void handleResponseEnd(AsyncFile asyncFile, long idTimer, Future<Void> future) {
private void handleResponseEnd(AsyncFile asyncFile, long idTimer, Promise<Void> promise) {
vertx.cancelTimer(idTimer);
asyncFile.flush().close(future);
asyncFile.flush().close(promise);
}

private void handleDownload(AsyncResult<Void> downloadResult, Future<Void> future) {
private void handleDownload(AsyncResult<Void> downloadResult, Promise<Void> promise) {
if (downloadResult.failed()) {
retryDownload(future, retryInterval, retryCount);
retryDownload(promise, retryInterval, retryCount);
} else {
future.complete();
promise.complete();
}
}

private void retryDownload(Future<Void> receivedFuture, long retryInterval, long retryCount) {
private void retryDownload(Promise<Void> receivedPromise, long retryInterval, long retryCount) {
logger.info("Set retry {0} to download from {1}. {2} retries left", retryInterval, downloadUrl, retryCount);
vertx.setTimer(retryInterval, retryTimerId -> handleRetry(receivedFuture, retryInterval, retryCount));
vertx.setTimer(retryInterval, retryTimerId -> handleRetry(receivedPromise, retryInterval, retryCount));
}

private void handleRetry(Future<Void> receivedFuture, long retryInterval, long retryCount) {
private void handleRetry(Promise<Void> receivedPromise, long retryInterval, long retryCount) {
if (retryCount > 0) {
final long next = retryCount - 1;
cleanUp(tmpFilePath).compose(ignore -> download())
.setHandler(retryResult -> handleRetryResult(retryInterval, next, retryResult, receivedFuture));
.setHandler(retryResult -> handleRetryResult(retryInterval, next, retryResult, receivedPromise));
} else {
cleanUp(tmpFilePath).setHandler(ignore -> receivedFuture.fail(new PreBidException(
cleanUp(tmpFilePath).setHandler(ignore -> receivedPromise.fail(new PreBidException(
String.format("File sync failed after %s retries", this.retryCount - retryCount))));
}
}

private void handleRetryResult(long retryInterval, long next, AsyncResult<Void> retryResult, Future<Void> future) {
private void handleRetryResult(long retryInterval, long next, AsyncResult<Void> retryResult,
Promise<Void> promise) {
if (retryResult.succeeded()) {
future.complete();
promise.complete();
} else {
retryDownload(future, retryInterval, next);
retryDownload(promise, retryInterval, next);
}
}

private Future<Void> swapFiles() {
final Future<Void> result = Future.future();
final Promise<Void> promise = Promise.promise();
logger.info("Sync {0} to {1}", tmpFilePath, saveFilePath);

final CopyOptions copyOptions = new CopyOptions().setReplaceExisting(true);
fileSystem.move(tmpFilePath, saveFilePath, copyOptions, result);
return result;
fileSystem.move(tmpFilePath, saveFilePath, copyOptions, promise);
return promise.future();
}

private void handleSync(RemoteFileProcessor remoteFileProcessor, AsyncResult<Boolean> syncResult) {
Expand Down Expand Up @@ -329,14 +334,14 @@ private Future<Boolean> tryUpdate() {
}

private Future<Boolean> isNeedToUpdate() {
final Future<Boolean> isNeedToUpdate = Future.future();
final Promise<Boolean> isNeedToUpdate = Promise.promise();
httpClient.headAbs(downloadUrl, response -> checkNewVersion(response, isNeedToUpdate))
.exceptionHandler(isNeedToUpdate::fail)
.end();
return isNeedToUpdate;
return isNeedToUpdate.future();
}

private void checkNewVersion(HttpClientResponse response, Future<Boolean> isNeedToUpdate) {
private void checkNewVersion(HttpClientResponse response, Promise<Boolean> isNeedToUpdate) {
final String contentLengthParameter = response.getHeader(HttpHeaders.CONTENT_LENGTH);
if (StringUtils.isNumeric(contentLengthParameter) && !contentLengthParameter.equals("0")) {
final long contentLength = Long.parseLong(contentLengthParameter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ private void circuitClosed() {

@Override
public Future<GeoInfo> lookup(String ip, Timeout timeout) {
return breaker.execute(future -> geoLocationService.lookup(ip, timeout).setHandler(future));
return breaker.execute(promise -> geoLocationService.lookup(ip, timeout).setHandler(promise));
}
}
2 changes: 1 addition & 1 deletion src/main/java/org/prebid/server/handler/OptoutHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Handler;
import io.vertx.core.http.Cookie;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.web.Cookie;
import io.vertx.ext.web.RoutingContext;
import org.apache.commons.lang3.StringUtils;
import org.prebid.server.cookie.UidsCookie;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/prebid/server/handler/SetuidHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.http.Cookie;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.web.Cookie;
import io.vertx.ext.web.RoutingContext;
import org.apache.commons.lang3.StringUtils;
import org.prebid.server.analytics.AnalyticsReporter;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.prebid.server.health;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.ext.jdbc.JDBCClient;
import io.vertx.ext.sql.SQLConnection;
Expand Down Expand Up @@ -36,9 +36,9 @@ public String name() {

@Override
void updateStatus() {
final Future<SQLConnection> connectionFuture = Future.future();
jdbcClient.getConnection(connectionFuture.completer());
connectionFuture.setHandler(result ->
final Promise<SQLConnection> connectionPromise = Promise.promise();
jdbcClient.getConnection(connectionPromise);
connectionPromise.future().setHandler(result ->
status = StatusResponse.of(
result.succeeded() ? Status.UP.name() : Status.DOWN.name(),
ZonedDateTime.now(Clock.systemUTC())));
Expand Down
Loading

0 comments on commit d2de2e5

Please sign in to comment.