Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Change-Id: I7c7aa8a2a90a66827c670b295bc26ee0b1b152e4
  • Loading branch information
igorbernstein2 committed Oct 2, 2024
1 parent a08a67b commit 5aa0a63
Show file tree
Hide file tree
Showing 15 changed files with 332 additions and 98 deletions.
Empty file.
51 changes: 51 additions & 0 deletions google-cloud-bigtable/src/main/java/CookieDataBoost.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import com.google.cloud.bigtable.admin.v2.BigtableInstanceAdminClient;
import com.google.cloud.bigtable.admin.v2.models.AppProfile;
import com.google.cloud.bigtable.admin.v2.models.UpdateAppProfileRequest;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.TableId;
import java.io.IOException;

public class CookieDataBoost {
private static final String PROJECT_ID = "google.com:cloud-bigtable-dev";
private static final String INSTANCE_ID = "igorbernstein-dev";
private static final String TABLE_ID = "table2";
private static final String APP_PROFILE_ID = "databoost-test";
private static final String CLUSTER_ID = "igorbernstein-dev-c0";

public static void main(String[] args) throws IOException {
BigtableInstanceAdminClient adminClient = BigtableInstanceAdminClient.create(PROJECT_ID);
// adminClient.createAppProfile(
// CreateAppProfileRequest.of(INSTANCE_ID, APP_PROFILE_ID)
//
// .setIsolationPolicy(AppProfile.DataBoostIsolationReadOnlyPolicy.of(AppProfile.ComputeBillingOwner.HOST_PAYS))
//
// .setRoutingPolicy(AppProfile.SingleClusterRoutingPolicy.of(CLUSTER_ID))
// );
adminClient.updateAppProfile(
UpdateAppProfileRequest.of(INSTANCE_ID, APP_PROFILE_ID)
.setIsolationPolicy(
AppProfile.DataBoostIsolationReadOnlyPolicy.of(
AppProfile.ComputeBillingOwner.HOST_PAYS))
.setRoutingPolicy(AppProfile.SingleClusterRoutingPolicy.of(CLUSTER_ID))
.setIgnoreWarnings(true));

BigtableDataClient client =
BigtableDataClient.create(
BigtableDataSettings.newBuilder()
.setProjectId(PROJECT_ID)
.setInstanceId(INSTANCE_ID)
.setAppProfileId(APP_PROFILE_ID)
.build());
client.readRows(Query.create(TABLE_ID).limit(1)).iterator().next();
adminClient.updateAppProfile(
UpdateAppProfileRequest.of(INSTANCE_ID, APP_PROFILE_ID)
.setIsolationPolicy(AppProfile.StandardIsolationPolicy.of(AppProfile.Priority.HIGH))
.setRoutingPolicy(AppProfile.SingleClusterRoutingPolicy.of(CLUSTER_ID))
.setIgnoreWarnings(true));
client.mutateRow(RowMutation.create(TableId.of(TABLE_ID), "some-row").deleteRow());
client.readRows(Query.create(TABLE_ID).limit(1)).iterator().next();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.api.gax.retrying.RetryAlgorithm;
import com.google.api.gax.retrying.RetryingExecutorWithContext;
import com.google.api.gax.retrying.ScheduledRetryingExecutor;
import com.google.api.gax.retrying.SimpleStreamResumptionStrategy;
import com.google.api.gax.rpc.Callables;
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.RequestParamsExtractor;
Expand Down Expand Up @@ -123,6 +124,8 @@
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsPartialErrorRetryAlgorithm;
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
import com.google.cloud.bigtable.data.v2.stub.opt.UnaryOverStreamingCallable;
import com.google.cloud.bigtable.data.v2.stub.opt.UnaryOverStreamingTracerModCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsFirstCallable;
Expand Down Expand Up @@ -154,6 +157,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -563,11 +567,13 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>

ReadRowsFirstCallable<RowT> firstRow = new ReadRowsFirstCallable<>(readRowCallable);

UnaryCallable<Query, RowT> traced =
new TracedUnaryCallable<>(
firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow"));

return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
UnaryOverStreamingTracerModCallable modCallable =
new UnaryOverStreamingTracerModCallable(firstRow);
TracedServerStreamingCallable<Query, RowT> traced =
new TracedServerStreamingCallable<>(
modCallable, clientContext.getTracerFactory(), getSpanName("ReadRow"));
UnaryOverStreamingCallable<Query, RowT> unaryAdapter = new UnaryOverStreamingCallable<>(traced);
return unaryAdapter.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -794,8 +800,8 @@ private UnaryCallable<String, List<KeyOffset>> createSampleRowKeysCallable() {
*/
private UnaryCallable<RowMutation, Void> createMutateRowCallable() {
String methodName = "MutateRow";
UnaryCallable<MutateRowRequest, MutateRowResponse> base =
GrpcRawCallableFactory.createUnaryCallable(
ServerStreamingCallable<MutateRowRequest, MutateRowResponse> base =
GrpcRawCallableFactory.createServerStreamingCallable(
GrpcCallSettings.<MutateRowRequest, MutateRowResponse>newBuilder()
.setMethodDescriptor(BigtableGrpc.getMutateRowMethod())
.setParamsExtractor(
Expand All @@ -818,17 +824,26 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
.build(),
settings.mutateRowSettings().getRetryableCodes());

UnaryCallable<MutateRowRequest, MutateRowResponse> withStatsHeaders =
new StatsHeadersUnaryCallable<>(base);
ServerStreamingCallable<MutateRowRequest, MutateRowResponse> withStatsHeaders =
new StatsHeadersServerStreamingCallable<>(base);

UnaryCallable<MutateRowRequest, MutateRowResponse> withBigtableTracer =
new BigtableTracerUnaryCallable<>(withStatsHeaders);
ServerStreamingCallable<MutateRowRequest, MutateRowResponse> withBigtableTracer =
new BigtableTracerStreamingCallable<>(withStatsHeaders);

UnaryCallable<MutateRowRequest, MutateRowResponse> retrying =
withRetries(withBigtableTracer, settings.mutateRowSettings());
ServerStreamingCallable<MutateRowRequest, MutateRowResponse> retrying =
withRetries(
withBigtableTracer,
convertUnaryToServerStreamingSettings(settings.mutateRowSettings()));

return createUserFacingUnaryCallable(
methodName, new MutateRowCallable(retrying, requestContext));
UnaryOverStreamingTracerModCallable<MutateRowRequest, MutateRowResponse> modCallable =
new UnaryOverStreamingTracerModCallable<>(retrying);
TracedServerStreamingCallable<MutateRowRequest, MutateRowResponse> traced =
new TracedServerStreamingCallable<>(
modCallable, clientContext.getTracerFactory(), getSpanName(methodName));
UnaryOverStreamingCallable<MutateRowRequest, MutateRowResponse> callable =
new UnaryOverStreamingCallable<>(traced);
MutateRowCallable mutateRowCallable = new MutateRowCallable(callable, requestContext);
return mutateRowCallable.withDefaultCallContext(clientContext.getDefaultCallContext());
}

/**
Expand Down Expand Up @@ -1512,6 +1527,18 @@ private SpanName getSpanName(String methodName) {
return SpanName.of(CLIENT_NAME, methodName);
}

private <ReqT, RespT>
ServerStreamingCallSettings<ReqT, RespT> convertUnaryToServerStreamingSettings(
UnaryCallSettings<?, ?> unarySettings) {
return ServerStreamingCallSettings.<ReqT, RespT>newBuilder()
.setResumptionStrategy(new SimpleStreamResumptionStrategy<>())
.setRetryableCodes(unarySettings.getRetryableCodes())
.setRetrySettings(unarySettings.getRetrySettings())
.setIdleTimeoutDuration(Duration.ZERO)
.setWaitTimeoutDuration(Duration.ZERO)
.build();
}

@Override
public void close() {
if (closeClientContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.api.core.BetaApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.ApiTracerFactory;
import com.google.api.gax.tracing.BaseApiTracer;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -86,4 +87,12 @@ public void setLocations(String zone, String cluster) {
public void grpcChannelQueuedLatencies(long queuedTimeMs) {
// noop
}

public void overrideOperationType(ApiTracerFactory.OperationType operationType) {
// noop
}

public void operationFinishedEarly() {
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
class BuiltinMetricsTracer extends BigtableTracer {

private static final String NAME = "java-bigtable/" + Version.VERSION;
private final OperationType operationType;
private volatile OperationType operationType;
private final SpanName spanName;

// Operation level metrics
Expand Down Expand Up @@ -132,6 +132,21 @@ public void close() {}
};
}

@Override
public void overrideOperationType(OperationType operationType) {
this.operationType = operationType;
}

@Override
public void operationFinishedEarly() {
if (attemptTimer.isRunning()) {
attemptTimer.stop();
}
if (operationTimer.isRunning()) {
operationTimer.stop();
}
}

@Override
public void operationSucceeded() {
recordOperationCompletion(null);
Expand Down Expand Up @@ -276,7 +291,11 @@ private void recordOperationCompletion(@Nullable Throwable status) {
if (!opFinished.compareAndSet(false, true)) {
return;
}
operationTimer.stop();

// timer might've been stopped early because due to #operationFinishedEarly()
if (operationTimer.isRunning()) {
operationTimer.stop();
}

boolean isStreaming = operationType == OperationType.ServerStreaming;
String statusStr = Util.extractStatus(status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.gax.tracing.ApiTracer;
import com.google.api.gax.tracing.ApiTracerFactory;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -62,6 +63,24 @@ public void close() {
};
}

@Override
public void overrideOperationType(ApiTracerFactory.OperationType operationType) {
for (ApiTracer child : children) {
if (child instanceof BigtableTracer) {
((BigtableTracer) child).overrideOperationType(operationType);
}
}
}

@Override
public void operationFinishedEarly() {
for (ApiTracer child : children) {
if (child instanceof BigtableTracer) {
((BigtableTracer) child).operationFinishedEarly();
}
}
}

@Override
public void operationSucceeded() {
for (ApiTracer child : children) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

class MetricsTracer extends BigtableTracer {

private final OperationType operationType;
private volatile OperationType operationType;

private final Tagger tagger;
private final StatsRecorder stats;
Expand Down Expand Up @@ -84,6 +84,17 @@ public void close() {}
};
}

@Override
public void overrideOperationType(OperationType operationType) {
this.operationType = operationType;
}

@Override
public void operationFinishedEarly() {
attemptTimer.stop();
operationTimer.stop();
}

@Override
public void operationSucceeded() {
recordOperationCompletion(null);
Expand All @@ -103,7 +114,9 @@ private void recordOperationCompletion(@Nullable Throwable throwable) {
if (!opFinished.compareAndSet(false, true)) {
return;
}
operationTimer.stop();
if (operationTimer.isRunning()) {
operationTimer.stop();
}

long elapsed = operationTimer.elapsed(TimeUnit.MILLISECONDS);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.google.cloud.bigtable.data.v2.stub.opt;

import com.google.api.core.AbstractApiFuture;
import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;
import com.google.api.gax.rpc.UnaryCallable;

public class UnaryOverStreamingCallable<ReqT, RespT> extends UnaryCallable<ReqT, RespT> {
private final ServerStreamingCallable<ReqT, RespT> inner;

public UnaryOverStreamingCallable(ServerStreamingCallable<ReqT, RespT> inner) {
this.inner = inner;
}

@Override
public ApiFuture<RespT> futureCall(ReqT request, ApiCallContext context) {
UnaryFuture<RespT> future = new UnaryFuture<>();
inner.call(request, future.observer, context);
return future;
}

private static class UnaryFuture<RespT> extends AbstractApiFuture<RespT> {
private StreamController controller;

private final ResponseObserver<RespT> observer =
new ResponseObserver<RespT>() {
@Override
public void onStart(StreamController streamController) {
UnaryFuture.this.controller = streamController;
}

@Override
public void onResponse(RespT response) {
UnaryFuture.this.set(response);
}

@Override
public void onError(Throwable throwable) {
UnaryFuture.this.setException(throwable);
}

@Override
public void onComplete() {
UnaryFuture.this.set(null);
}
};

@Override
protected void interruptTask() {
controller.cancel();
}
}
}
Loading

0 comments on commit 5aa0a63

Please sign in to comment.