Skip to content

Commit

Permalink
[PulsarAdmin] Bookies to async (apache#6529)
Browse files Browse the repository at this point in the history
* Bookies to async

* fix update

* fix path

* Update BookiesImpl.java
  • Loading branch information
yjshen authored and huangdx0726 committed Aug 24, 2020
1 parent 0c972b3 commit a2d5f5e
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;

import java.util.concurrent.CompletableFuture;

/**
* Admin interface for bookies rack placement management.
*/
Expand All @@ -31,18 +33,38 @@ public interface Bookies {
*/
BookiesRackConfiguration getBookiesRackInfo() throws PulsarAdminException;

/**
* Gets the rack placement information for all the bookies in the cluster asynchronously
*/
CompletableFuture<BookiesRackConfiguration> getBookiesRackInfoAsync();

/**
* Gets the rack placement information for a specific bookie in the cluster
*/
BookieInfo getBookieRackInfo(String bookieAddress) throws PulsarAdminException;

/**
* Gets the rack placement information for a specific bookie in the cluster asynchronously
*/
CompletableFuture<BookieInfo> getBookieRackInfoAsync(String bookieAddress);

/**
* Remove rack placement information for a specific bookie in the cluster
*/
void deleteBookieRackInfo(String bookieAddress) throws PulsarAdminException;

/**
* Remove rack placement information for a specific bookie in the cluster asynchronously
*/
CompletableFuture<Void> deleteBookieRackInfoAsync(String bookieAddress);

/**
* Updates the rack placement information for a specific bookie in the cluster
*/
void updateBookieRackInfo(String bookieAddress, String group, BookieInfo bookieInfo) throws PulsarAdminException;

/**
* Updates the rack placement information for a specific bookie in the cluster asynchronously
*/
CompletableFuture<Void> updateBookieRackInfoAsync(String bookieAddress, String group, BookieInfo bookieInfo);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.admin.internal;

import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;

Expand All @@ -27,7 +28,11 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.policies.data.ErrorData;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class BookiesImpl extends BaseResource implements Bookies {
private final WebTarget adminBookies;
Expand All @@ -40,38 +45,108 @@ public BookiesImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
@Override
public BookiesRackConfiguration getBookiesRackInfo() throws PulsarAdminException {
try {
return request(adminBookies.path("racks-info")).get(BookiesRackConfiguration.class);
} catch (Exception e) {
throw getApiException(e);
return getBookiesRackInfoAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<BookiesRackConfiguration> getBookiesRackInfoAsync() {
WebTarget path = adminBookies.path("racks-info");
final CompletableFuture<BookiesRackConfiguration> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<BookiesRackConfiguration>() {
@Override
public void completed(BookiesRackConfiguration bookiesRackConfiguration) {
future.complete(bookiesRackConfiguration);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
public BookieInfo getBookieRackInfo(String bookieAddress) throws PulsarAdminException {
try {
return request(adminBookies.path("racks-info").path(bookieAddress)).get(BookieInfo.class);
} catch (Exception e) {
throw getApiException(e);
return getBookieRackInfoAsync(bookieAddress).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<BookieInfo> getBookieRackInfoAsync(String bookieAddress) {
WebTarget path = adminBookies.path("racks-info").path(bookieAddress);
final CompletableFuture<BookieInfo> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<BookieInfo>() {
@Override
public void completed(BookieInfo bookieInfo) {
future.complete(bookieInfo);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
public void deleteBookieRackInfo(String bookieAddress) throws PulsarAdminException {
try {
request(adminBookies.path("racks-info").path(bookieAddress)).delete(ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
deleteBookieRackInfoAsync(bookieAddress).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> deleteBookieRackInfoAsync(String bookieAddress) {
WebTarget path = adminBookies.path("racks-info").path(bookieAddress);
return asyncDeleteRequest(path);
}

@Override
public void updateBookieRackInfo(String bookieAddress, String group, BookieInfo bookieInfo)
throws PulsarAdminException {
try {
request(adminBookies.path("racks-info").path(bookieAddress).queryParam("group", group))
.post(Entity.entity(bookieInfo, MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
updateBookieRackInfoAsync(bookieAddress, group, bookieInfo).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> updateBookieRackInfoAsync(String bookieAddress, String group, BookieInfo bookieInfo) {
WebTarget path = adminBookies.path("racks-info").path(bookieAddress).queryParam("group", group);
return asyncPostRequest(path, Entity.entity(bookieInfo, MediaType.APPLICATION_JSON));
}

}

0 comments on commit a2d5f5e

Please sign in to comment.