Skip to content

Commit

Permalink
Bigtable: add CRUD for clusters (googleapis#3612)
Browse files Browse the repository at this point in the history
  • Loading branch information
igorbernstein2 authored Sep 4, 2018
1 parent 8bcc89b commit e3eedeb
Show file tree
Hide file tree
Showing 6 changed files with 744 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,23 @@
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.bigtable.admin.v2.ClusterName;
import com.google.bigtable.admin.v2.DeleteClusterRequest;
import com.google.bigtable.admin.v2.DeleteInstanceRequest;
import com.google.bigtable.admin.v2.GetClusterRequest;
import com.google.bigtable.admin.v2.GetInstanceRequest;
import com.google.bigtable.admin.v2.InstanceName;
import com.google.bigtable.admin.v2.ListClustersRequest;
import com.google.bigtable.admin.v2.ListClustersResponse;
import com.google.bigtable.admin.v2.ListInstancesRequest;
import com.google.bigtable.admin.v2.ListInstancesResponse;
import com.google.bigtable.admin.v2.LocationName;
import com.google.bigtable.admin.v2.ProjectName;
import com.google.cloud.bigtable.admin.v2.models.Cluster;
import com.google.cloud.bigtable.admin.v2.models.CreateClusterRequest;
import com.google.cloud.bigtable.admin.v2.models.CreateInstanceRequest;
import com.google.cloud.bigtable.admin.v2.models.Instance;
import com.google.cloud.bigtable.admin.v2.models.PartialListClustersException;
import com.google.cloud.bigtable.admin.v2.models.PartialListInstancesException;
import com.google.cloud.bigtable.admin.v2.models.UpdateInstanceRequest;
import com.google.cloud.bigtable.admin.v2.stub.BigtableInstanceAdminStub;
Expand Down Expand Up @@ -397,6 +405,282 @@ public Void apply(Empty input) {
);
}

/**
* Creates a new cluster in the specified instance.
*
* <p>Sample code:
*
* <pre>{@code
* Cluster cluster = client.createCluster(
* CreateClusterRequest.of("my-instance", "my-new-cluster")
* .setZone("us-east1-c")
* .setServeNodes(3)
* .setStorageType(StorageType.SSD)
* );
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public Cluster createCluster(CreateClusterRequest request) {
return awaitFuture(createClusterAsync(request));
}

/**
* Asynchronously creates a new cluster in the specified instance.
*
* <p>Sample code:
*
* <pre>{@code
* ApiFuture<Cluster> clusterFuture = client.createClusterAsync(
* CreateClusterRequest.of("my-instance", "my-new-cluster")
* .setZone("us-east1-c")
* .setServeNodes(3)
* .setStorageType(StorageType.SSD)
* );
*
* Cluster cluster = clusterFuture.get();
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public ApiFuture<Cluster> createClusterAsync(CreateClusterRequest request) {
return ApiFutures.transform(
stub.createClusterOperationCallable().futureCall(request.toProto(projectName)),
new ApiFunction<com.google.bigtable.admin.v2.Cluster, Cluster>() {
@Override
public Cluster apply(com.google.bigtable.admin.v2.Cluster proto) {
return Cluster.fromProto(proto);
}
},
MoreExecutors.directExecutor()
);
}

/**
* Get the cluster representation by ID.
*
* <p>Sample code:
*
* <pre>{@code
* Cluster cluster = client.getCluster("my-instance", "my-cluster");
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public Cluster getCluster(String instanceId, String clusterId) {
return awaitFuture(getClusterAsync(instanceId, clusterId));
}

/**
* Asynchronously gets the cluster representation by ID.
*
* <p>Sample code:
*
* <pre>{@code
* ApiFuture<Cluster> clusterFuture = client.getClusterAsync("my-instance", "my-cluster");
* Cluster cluster = clusterFuture.get();
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public ApiFuture<Cluster> getClusterAsync(String instanceId, String clusterId) {
ClusterName name = ClusterName.of(projectName.getProject(), instanceId, clusterId);

GetClusterRequest request = GetClusterRequest.newBuilder()
.setName(name.toString())
.build();

return ApiFutures.transform(
stub.getClusterCallable().futureCall(request),
new ApiFunction<com.google.bigtable.admin.v2.Cluster, Cluster>() {
@Override
public Cluster apply(com.google.bigtable.admin.v2.Cluster proto) {
return Cluster.fromProto(proto);
}
},
MoreExecutors.directExecutor()
);
}

/**
* Lists all clusters in the specified instance.
*
* <p>This method will throw a {@link PartialListClustersException} when any zone is
* unavailable. If partial listing are ok, the exception can be caught and inspected.
*
* <p>Sample code:
*
* <pre>{@code
* try {
* List<Cluster> clusters = cluster.listClusters("my-instance");
* } catch (PartialListClustersException e) {
* System.out.println("The following zones are unavailable: " + e.getUnavailableZones());
* System.out.println("But the following clusters are reachable: " + e.getClusters())
* }
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public List<Cluster> listClusters(String instanceId) {
return awaitFuture(listClustersAsync(instanceId));
}

/**
* Asynchronously lists all clusters in the specified instance.
*
* <p>This method will throw a {@link PartialListClustersException} when any zone is
* unavailable. If partial listing are ok, the exception can be caught and inspected.
*
* <p>Sample code:
*
* <pre>{@code
* ApiFuture<Cluster> clustersFuture = client.listClustersAsync("my-instance");
*
* ApiFutures.addCallback(clustersFuture, new ApiFutureCallback<List<Cluster>>() {
* public void onFailure(Throwable t) {
* if (t instanceof PartialListClustersException) {
* PartialListClustersException partialError = (PartialListClustersException)t;
* System.out.println("The following zones are unavailable: " + partialError.getUnavailableZones());
* System.out.println("But the following clusters are reachable: " + partialError.getClusters());
* } else {
* t.printStackTrace();
* }
* }
*
* public void onSuccess(List<Cluster> result) {
* System.out.println("Found a complete set of instances: " + result);
* }
* }, MoreExecutors.directExecutor());
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public ApiFuture<List<Cluster>> listClustersAsync(String instanceId) {
InstanceName name = InstanceName.of(projectName.getProject(), instanceId);
ListClustersRequest request = ListClustersRequest.newBuilder()
.setParent(name.toString())
.build();

return ApiFutures.transform(
stub.listClustersCallable().futureCall(request),
new ApiFunction<ListClustersResponse, List<Cluster>>() {
@Override
public List<Cluster> apply(ListClustersResponse proto) {
// NOTE: serverside pagination is not and will not be implemented, so remaining pages
// are not fetched. However, if that assumption turns out to be wrong, fail fast to
// avoid returning partial data.
Verify.verify(proto.getNextPageToken().isEmpty(),
"Server returned an unexpected paginated response");

ImmutableList.Builder<Cluster> clusters = ImmutableList.builder();
for (com.google.bigtable.admin.v2.Cluster cluster : proto.getClustersList()) {
clusters.add(Cluster.fromProto(cluster));
}

ImmutableList.Builder<String> failedZones = ImmutableList.builder();
for (String locationStr : proto.getFailedLocationsList()) {
LocationName fullLocation = Objects.requireNonNull(LocationName.parse(locationStr));
failedZones.add(fullLocation.getLocation());
}

if (!failedZones.build().isEmpty()) {
throw new PartialListClustersException(failedZones.build(), clusters.build());
}

return clusters.build();
}
},
MoreExecutors.directExecutor()
);
}

/**
* Resizes the cluster's node count. Please note that only clusters that belong to a PRODUCTION
* instance can be resized.
*
* <p>Sample code:
*
* <pre>{@code
* Cluster cluster = cluster.resizeCluster("my-instance", "my-cluster", 30);
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public Cluster resizeCluster(String instanceId, String clusterId, int numServeNodes) {
return awaitFuture(resizeClusterAsync(instanceId, clusterId, numServeNodes));
}

/**
* Asynchronously resizes the cluster's node count. Please note that only clusters that belong to
* a PRODUCTION instance can be resized.
*
* <pre>{@code
* ApiFuture<Cluster> clusterFuture = cluster.resizeCluster("my-instance", "my-cluster", 30);
* Cluster cluster = clusterFuture.get();
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public ApiFuture<Cluster> resizeClusterAsync(String instanceId, String clusterId,
int numServeNodes) {

ClusterName name = ClusterName.of(projectName.getProject(), instanceId, clusterId);

com.google.bigtable.admin.v2.Cluster request = com.google.bigtable.admin.v2.Cluster.newBuilder()
.setName(name.toString())
.setServeNodes(numServeNodes)
.build();

return ApiFutures.transform(
stub.updateClusterOperationCallable().futureCall(request),
new ApiFunction<com.google.bigtable.admin.v2.Cluster, Cluster>() {
@Override
public Cluster apply(com.google.bigtable.admin.v2.Cluster proto) {
return Cluster.fromProto(proto);
}
},
MoreExecutors.directExecutor()
);
}

/**
* Deletes the specified cluster. Please note that an instance must have at least 1 cluster. To
* remove the last cluster, please use {@link BigtableInstanceAdminClient#deleteInstance(String)}.
*
* <p>Sample code:
*
* <pre>{@code
* client.deleteCluster("my-instance", "my-cluster");
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public void deleteCluster(String instanceId, String clusterId) {
awaitFuture(deleteClusterAsync(instanceId, clusterId));
}

/**
* Asynchronously deletes the specified cluster. Please note that an instance must have at least 1
* cluster. To remove the last cluster, please use {@link BigtableInstanceAdminClient#deleteInstanceAsync(String)}.
*
* <p>Sample code:
*
* <pre>{@code
* ApiFuture<Void> future = client.deleteClusterAsync("my-instance", "my-cluster");
* future.get();
* }</pre>
*/
@SuppressWarnings("WeakerAccess")
public ApiFuture<Void> deleteClusterAsync(String instanceId, String clusterId) {
ClusterName name = ClusterName.of(projectName.getProject(), instanceId, clusterId);

DeleteClusterRequest request = DeleteClusterRequest.newBuilder()
.setName(name.toString())
.build();

return ApiFutures.transform(
stub.deleteClusterCallable().futureCall(request),
new ApiFunction<Empty, Void>() {
@Override
public Void apply(Empty input) {
return null;
}
},
MoreExecutors.directExecutor()
);
}

/**
* Awaits the result of a future, taking care to propagate errors while maintaining the call site
* in a suppressed exception. This allows semantic errors to be caught across threads, while
Expand All @@ -406,7 +690,6 @@ public Void apply(Empty input) {
// TODO(igorbernstein2): try to move this into gax
private <T> T awaitFuture(ApiFuture<T> future) {
RuntimeException error;

try {
return Futures.getUnchecked(future);
} catch (UncheckedExecutionException e) {
Expand All @@ -418,10 +701,8 @@ private <T> T awaitFuture(ApiFuture<T> future) {
} catch (RuntimeException e) {
error = e;
}

// Add the caller's stack as a suppressed exception
error.addSuppressed(new RuntimeException("Encountered error while awaiting future"));

throw error;
}
}
Loading

0 comments on commit e3eedeb

Please sign in to comment.