Skip to content

Commit

Permalink
Remove directly access to singleton
Browse files Browse the repository at this point in the history
  • Loading branch information
GaoleMeng committed Jul 18, 2024
1 parent bf13fe9 commit 320492b
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ class ConnectionWorker implements AutoCloseable {
*/
private final RetrySettings retrySettings;

private final RequestProfiler.RequestProfilerHook requestProfilerHook;

private static String projectMatching = "projects/[^/]+/";
private static Pattern streamPatternProject = Pattern.compile(projectMatching);

Expand Down Expand Up @@ -386,7 +388,8 @@ public ConnectionWorker(
String traceId,
@Nullable String compressorName,
BigQueryWriteSettings clientSettings,
RetrySettings retrySettings)
RetrySettings retrySettings,
boolean enableRequestProfiler)
throws IOException {
this.lock = new ReentrantLock();
this.hasMessageInWaitingQueue = lock.newCondition();
Expand All @@ -410,6 +413,7 @@ public ConnectionWorker(
this.compressorName = compressorName;
this.retrySettings = retrySettings;
this.telemetryAttributes = buildOpenTelemetryAttributes();
this.requestProfilerHook = new RequestProfiler.RequestProfilerHook(enableRequestProfiler);
registerOpenTelemetryMetrics();

// Always recreate a client for connection worker.
Expand Down Expand Up @@ -503,7 +507,7 @@ private boolean shouldWaitForBackoff(AppendRequestAndResponse requestWrapper) {

private void waitForBackoffIfNecessary(AppendRequestAndResponse requestWrapper) {
lock.lock();
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
requestProfilerHook.startOperation(
RequestProfiler.OperationName.RETRY_BACKOFF, requestWrapper.requestUniqueId);
try {
Condition condition = lock.newCondition();
Expand All @@ -513,7 +517,7 @@ private void waitForBackoffIfNecessary(AppendRequestAndResponse requestWrapper)
} catch (InterruptedException e) {
throw new IllegalStateException(e);
} finally {
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
requestProfilerHook.endOperation(
RequestProfiler.OperationName.RETRY_BACKOFF, requestWrapper.requestUniqueId);
lock.unlock();
}
Expand All @@ -535,7 +539,7 @@ private void addMessageToWaitingQueue(
++this.inflightRequests;
this.inflightBytes += requestWrapper.messageSize;
hasMessageInWaitingQueue.signal();
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
requestProfilerHook.startOperation(
RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId);
if (addToFront) {
waitingRequestQueue.addFirst(requestWrapper);
Expand Down Expand Up @@ -649,13 +653,12 @@ private ApiFuture<AppendRowsResponse> appendInternal(
writerId));
return requestWrapper.appendResult;
}
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
RequestProfiler.OperationName.WAIT_QUEUE, requestUniqueId);
requestProfilerHook.startOperation(RequestProfiler.OperationName.WAIT_QUEUE, requestUniqueId);
++this.inflightRequests;
this.inflightBytes += requestWrapper.messageSize;
waitingRequestQueue.addLast(requestWrapper);
hasMessageInWaitingQueue.signal();
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
requestProfilerHook.startOperation(
RequestProfiler.OperationName.WAIT_INFLIGHT_QUOTA, requestUniqueId);
try {
maybeWaitForInflightQuota();
Expand All @@ -665,7 +668,7 @@ private ApiFuture<AppendRowsResponse> appendInternal(
this.inflightBytes -= requestWrapper.messageSize;
throw ex;
}
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
requestProfilerHook.endOperation(
RequestProfiler.OperationName.WAIT_INFLIGHT_QUOTA, requestUniqueId);
return requestWrapper.appendResult;
} finally {
Expand Down Expand Up @@ -831,10 +834,10 @@ private void appendLoop() {
while (!inflightRequestQueue.isEmpty()) {
AppendRequestAndResponse requestWrapper = inflightRequestQueue.pollLast();
// Consider the backend latency as completed for the current request.
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
requestProfilerHook.endOperation(
RequestProfiler.OperationName.RESPONSE_LATENCY, requestWrapper.requestUniqueId);
requestWrapper.requestSendTimeStamp = null;
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
requestProfilerHook.startOperation(
RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId);
waitingRequestQueue.addFirst(requestWrapper);
}
Expand All @@ -845,7 +848,7 @@ private void appendLoop() {
}
while (!this.waitingRequestQueue.isEmpty()) {
AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst();
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
requestProfilerHook.endOperation(
RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId);
waitForBackoffIfNecessary(requestWrapper);
this.inflightRequestQueue.add(requestWrapper);
Expand Down Expand Up @@ -931,7 +934,7 @@ private void appendLoop() {
}
firstRequestForTableOrSchemaSwitch = false;

RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
requestProfilerHook.startOperation(
RequestProfiler.OperationName.RESPONSE_LATENCY, requestUniqueId);

// Send should only throw an exception if there is a problem with the request. The catch
Expand Down Expand Up @@ -1212,7 +1215,7 @@ private void requestCallback(AppendRowsResponse response) {
}
if (!this.inflightRequestQueue.isEmpty()) {
requestWrapper = pollFirstInflightRequestQueue();
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
requestProfilerHook.endOperation(
RequestProfiler.OperationName.RESPONSE_LATENCY, requestWrapper.requestUniqueId);
} else if (inflightCleanuped) {
// It is possible when requestCallback is called, the inflight queue is already drained
Expand Down Expand Up @@ -1277,7 +1280,7 @@ private void requestCallback(AppendRowsResponse response) {
requestWrapper.appendResult.set(response);
}
} finally {
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
requestProfilerHook.endOperation(
RequestProfiler.OperationName.TOTAL_LATENCY, requestWrapper.requestUniqueId);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ public abstract static class Builder {
/** Static setting for connection pool. */
private static Settings settings = Settings.builder().build();

private final boolean enableRequestProfiler;

ConnectionWorkerPool(
long maxInflightRequests,
long maxInflightBytes,
Expand All @@ -213,7 +215,8 @@ public abstract static class Builder {
String traceId,
@Nullable String comperssorName,
BigQueryWriteSettings clientSettings,
RetrySettings retrySettings) {
RetrySettings retrySettings,
boolean enableRequestProfiler) {
this.maxInflightRequests = maxInflightRequests;
this.maxInflightBytes = maxInflightBytes;
this.maxRetryDuration = maxRetryDuration;
Expand All @@ -223,6 +226,7 @@ public abstract static class Builder {
this.clientSettings = clientSettings;
this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
this.retrySettings = retrySettings;
this.enableRequestProfiler = enableRequestProfiler;
}

/**
Expand Down Expand Up @@ -404,7 +408,8 @@ private ConnectionWorker createConnectionWorker(
traceId,
compressorName,
clientSettings,
retrySettings);
retrySettings,
enableRequestProfiler);
connectionWorkerPool.add(connectionWorker);
log.info(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ enum OperationName {
private static final int MAX_CACHED_REQUEST = 100000;

// Singleton for easier access.
static final RequestProfiler REQUEST_PROFILER_SINGLETON = new RequestProfiler();
private static final RequestProfiler REQUEST_PROFILER_SINGLETON = new RequestProfiler();

// Tunable static variable indicate how many top longest latency requests we should consider.
private static final int DEFAULT_TOP_K = 20;
Expand Down Expand Up @@ -172,33 +172,35 @@ void flushAndPrintReport() {
// Periodically trigger the report generation.
void startPeriodicalReportFlushing() {
this.enableProfiiler = true;
this.flushThread =
new Thread(
new Runnable() {
@Override
public void run() {
try {
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(FLUSH_PERIOD.toMillis());
} catch (InterruptedException e) {
log.warning("Flush report thread is interrupted by " + e.toString());
throw new RuntimeException(e);
if (this.flushThread == null || !this.flushThread.isAlive()) {
this.flushThread =
new Thread(
new Runnable() {
@Override
public void run() {
try {
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(FLUSH_PERIOD.toMillis());
} catch (InterruptedException e) {
log.warning("Flush report thread is interrupted by " + e.toString());
throw new RuntimeException(e);
}
flushAndPrintReport();
}
flushAndPrintReport();
} catch (Exception ex) {
// Mute any exception thrown from profiler process as we don't want to
// interrupt normal operations.
log.warning(
"Exception thrown request profiler ignored, this is suggesting faulty "
+ "implementation of "
+ "RequestProfiler, exception context: "
+ ex.toString());
}
} catch (Exception ex) {
// Mute any exception thrown from profiler process as we don't want to
// interrupt normal operations.
log.warning(
"Exception thrown request profiler ignored, this is suggesting faulty "
+ "implementation of "
+ "RequestProfiler, exception context: "
+ ex.toString());
}
}
});
this.flushThread.start();
});
this.flushThread.start();
}
}

String flushAndGenerateReportText() {
Expand Down Expand Up @@ -405,4 +407,45 @@ void internalDisableAndClearProfiler() {
public static void disableAndClearProfiler() {
REQUEST_PROFILER_SINGLETON.internalDisableAndClearProfiler();
}

/**
* A hook for easier access to request profiler. Otherwise we have to trigger tedious if clauses
* to check whether profiler is enabled before every caller's trigger of the request profiler.
* This is because profiler is shared statically across instances.
*/
static class RequestProfilerHook {
private boolean enableRequestProfiler = false;

RequestProfilerHook(boolean enableRequestProfiler) {
this.enableRequestProfiler = enableRequestProfiler;
}

// Mimic the api exposed by the main request profiler.
void startOperation(OperationName operationName, String requestUniqueId) {
if (this.enableRequestProfiler) {
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(operationName, requestUniqueId);
}
}

// Mimic the api exposed by the main request profiler.
void endOperation(OperationName operationName, String requestUniqueId) {
if (this.enableRequestProfiler) {
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(operationName, requestUniqueId);
}
}

void startPeriodicalReportFlushing() {
if (this.enableRequestProfiler) {
RequestProfiler.REQUEST_PROFILER_SINGLETON.startPeriodicalReportFlushing();
}
}

String flushAndGenerateReportText() {
return RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText();
}

void enableProfiler() {
REQUEST_PROFILER_SINGLETON.enableProfiler();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ public class SchemaAwareStreamWriter<T> implements AutoCloseable {
// the user provides the table schema, we should always use that schema.
private final boolean skipRefreshStreamWriter;

// Provide access to the request profiler.
private final RequestProfiler.RequestProfilerHook requestProfilerHook;

/**
* Constructs the SchemaAwareStreamWriter
*
Expand Down Expand Up @@ -103,8 +106,10 @@ private SchemaAwareStreamWriter(Builder<T> builder)
streamWriterBuilder.setDefaultMissingValueInterpretation(
builder.defaultMissingValueInterpretation);
streamWriterBuilder.setClientId(builder.clientId);
streamWriterBuilder.setEnableLatencyProfiler(builder.enableRequestProfiler);
requestProfilerHook = new RequestProfiler.RequestProfilerHook(builder.enableRequestProfiler);
if (builder.enableRequestProfiler) {
streamWriterBuilder.setEnableLatencyProfiler(builder.enableRequestProfiler);
requestProfilerHook.startPeriodicalReportFlushing();
}
this.streamWriter = streamWriterBuilder.build();
this.streamName = builder.streamName;
Expand All @@ -127,12 +132,12 @@ private SchemaAwareStreamWriter(Builder<T> builder)
public ApiFuture<AppendRowsResponse> append(Iterable<T> items)
throws IOException, DescriptorValidationException {
String requestUniqueId = generateRequestUniqueId();
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
requestProfilerHook.startOperation(
RequestProfiler.OperationName.TOTAL_LATENCY, requestUniqueId);
try {
return appendWithUniqueId(items, -1, requestUniqueId);
} catch (Exception ex) {
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
requestProfilerHook.endOperation(
RequestProfiler.OperationName.TOTAL_LATENCY, requestUniqueId);
throw ex;
}
Expand Down Expand Up @@ -197,12 +202,12 @@ private List<DynamicMessage> buildMessage(Iterable<T> items)
public ApiFuture<AppendRowsResponse> append(Iterable<T> items, long offset)
throws IOException, DescriptorValidationException {
String requestUniqueId = generateRequestUniqueId();
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
requestProfilerHook.startOperation(
RequestProfiler.OperationName.TOTAL_LATENCY, requestUniqueId);
try {
return appendWithUniqueId(items, offset, requestUniqueId);
} catch (Exception ex) {
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
requestProfilerHook.endOperation(
RequestProfiler.OperationName.TOTAL_LATENCY, requestUniqueId);
throw ex;
}
Expand All @@ -213,7 +218,7 @@ ApiFuture<AppendRowsResponse> appendWithUniqueId(
throws DescriptorValidationException, IOException {
// Handle schema updates in a Thread-safe way by locking down the operation
synchronized (this) {
RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(
requestProfilerHook.startOperation(
RequestProfiler.OperationName.JSON_TO_PROTO_CONVERSION, requestUniqueId);
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
try {
Expand Down Expand Up @@ -246,7 +251,7 @@ ApiFuture<AppendRowsResponse> appendWithUniqueId(
rowIndexToErrorMessage);
}
} finally {
RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(
requestProfilerHook.endOperation(
RequestProfiler.OperationName.JSON_TO_PROTO_CONVERSION, requestUniqueId);
}
return this.streamWriter.appendWithUniqueId(rowsBuilder.build(), offset, requestUniqueId);
Expand Down Expand Up @@ -529,9 +534,6 @@ private Builder(
this.skipRefreshStreamWriter = true;
}
this.toProtoConverter = toProtoConverter;
if (this.enableRequestProfiler) {
RequestProfiler.REQUEST_PROFILER_SINGLETON.startPeriodicalReportFlushing();
}
}

/**
Expand Down
Loading

0 comments on commit 320492b

Please sign in to comment.