diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java index ccf3d2db24..af6cbe8c3a 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java @@ -247,6 +247,8 @@ class ConnectionWorker implements AutoCloseable { */ private final RetrySettings retrySettings; + private final RequestProfiler.RequestProfilerHook requestProfilerHook; + private static String projectMatching = "projects/[^/]+/"; private static Pattern streamPatternProject = Pattern.compile(projectMatching); @@ -386,7 +388,8 @@ public ConnectionWorker( String traceId, @Nullable String compressorName, BigQueryWriteSettings clientSettings, - RetrySettings retrySettings) + RetrySettings retrySettings, + boolean enableRequestProfiler) throws IOException { this.lock = new ReentrantLock(); this.hasMessageInWaitingQueue = lock.newCondition(); @@ -410,6 +413,7 @@ public ConnectionWorker( this.compressorName = compressorName; this.retrySettings = retrySettings; this.telemetryAttributes = buildOpenTelemetryAttributes(); + this.requestProfilerHook = new RequestProfiler.RequestProfilerHook(enableRequestProfiler); registerOpenTelemetryMetrics(); // Always recreate a client for connection worker. @@ -503,7 +507,7 @@ private boolean shouldWaitForBackoff(AppendRequestAndResponse requestWrapper) { private void waitForBackoffIfNecessary(AppendRequestAndResponse requestWrapper) { lock.lock(); - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( + requestProfilerHook.startOperation( RequestProfiler.OperationName.RETRY_BACKOFF, requestWrapper.requestUniqueId); try { Condition condition = lock.newCondition(); @@ -513,7 +517,7 @@ private void waitForBackoffIfNecessary(AppendRequestAndResponse requestWrapper) } catch (InterruptedException e) { throw new IllegalStateException(e); } finally { - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + requestProfilerHook.endOperation( RequestProfiler.OperationName.RETRY_BACKOFF, requestWrapper.requestUniqueId); lock.unlock(); } @@ -535,7 +539,7 @@ private void addMessageToWaitingQueue( ++this.inflightRequests; this.inflightBytes += requestWrapper.messageSize; hasMessageInWaitingQueue.signal(); - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( + requestProfilerHook.startOperation( RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId); if (addToFront) { waitingRequestQueue.addFirst(requestWrapper); @@ -649,13 +653,12 @@ private ApiFuture appendInternal( writerId)); return requestWrapper.appendResult; } - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( - RequestProfiler.OperationName.WAIT_QUEUE, requestUniqueId); + requestProfilerHook.startOperation(RequestProfiler.OperationName.WAIT_QUEUE, requestUniqueId); ++this.inflightRequests; this.inflightBytes += requestWrapper.messageSize; waitingRequestQueue.addLast(requestWrapper); hasMessageInWaitingQueue.signal(); - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( + requestProfilerHook.startOperation( RequestProfiler.OperationName.WAIT_INFLIGHT_QUOTA, requestUniqueId); try { maybeWaitForInflightQuota(); @@ -665,7 +668,7 @@ private ApiFuture appendInternal( this.inflightBytes -= requestWrapper.messageSize; throw ex; } - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + requestProfilerHook.endOperation( RequestProfiler.OperationName.WAIT_INFLIGHT_QUOTA, requestUniqueId); return requestWrapper.appendResult; } finally { @@ -831,10 +834,10 @@ private void appendLoop() { while (!inflightRequestQueue.isEmpty()) { AppendRequestAndResponse requestWrapper = inflightRequestQueue.pollLast(); // Consider the backend latency as completed for the current request. - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + requestProfilerHook.endOperation( RequestProfiler.OperationName.RESPONSE_LATENCY, requestWrapper.requestUniqueId); requestWrapper.requestSendTimeStamp = null; - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( + requestProfilerHook.startOperation( RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId); waitingRequestQueue.addFirst(requestWrapper); } @@ -845,7 +848,7 @@ private void appendLoop() { } while (!this.waitingRequestQueue.isEmpty()) { AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst(); - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + requestProfilerHook.endOperation( RequestProfiler.OperationName.WAIT_QUEUE, requestWrapper.requestUniqueId); waitForBackoffIfNecessary(requestWrapper); this.inflightRequestQueue.add(requestWrapper); @@ -931,7 +934,7 @@ private void appendLoop() { } firstRequestForTableOrSchemaSwitch = false; - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( + requestProfilerHook.startOperation( RequestProfiler.OperationName.RESPONSE_LATENCY, requestUniqueId); // Send should only throw an exception if there is a problem with the request. The catch @@ -1212,7 +1215,7 @@ private void requestCallback(AppendRowsResponse response) { } if (!this.inflightRequestQueue.isEmpty()) { requestWrapper = pollFirstInflightRequestQueue(); - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + requestProfilerHook.endOperation( RequestProfiler.OperationName.RESPONSE_LATENCY, requestWrapper.requestUniqueId); } else if (inflightCleanuped) { // It is possible when requestCallback is called, the inflight queue is already drained @@ -1277,7 +1280,7 @@ private void requestCallback(AppendRowsResponse response) { requestWrapper.appendResult.set(response); } } finally { - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + requestProfilerHook.endOperation( RequestProfiler.OperationName.TOTAL_LATENCY, requestWrapper.requestUniqueId); } }); diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index b9e91b5840..6e5188fb7d 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -205,6 +205,8 @@ public abstract static class Builder { /** Static setting for connection pool. */ private static Settings settings = Settings.builder().build(); + private final boolean enableRequestProfiler; + ConnectionWorkerPool( long maxInflightRequests, long maxInflightBytes, @@ -213,7 +215,8 @@ public abstract static class Builder { String traceId, @Nullable String comperssorName, BigQueryWriteSettings clientSettings, - RetrySettings retrySettings) { + RetrySettings retrySettings, + boolean enableRequestProfiler) { this.maxInflightRequests = maxInflightRequests; this.maxInflightBytes = maxInflightBytes; this.maxRetryDuration = maxRetryDuration; @@ -223,6 +226,7 @@ public abstract static class Builder { this.clientSettings = clientSettings; this.currentMaxConnectionCount = settings.minConnectionsPerRegion(); this.retrySettings = retrySettings; + this.enableRequestProfiler = enableRequestProfiler; } /** @@ -404,7 +408,8 @@ private ConnectionWorker createConnectionWorker( traceId, compressorName, clientSettings, - retrySettings); + retrySettings, + enableRequestProfiler); connectionWorkerPool.add(connectionWorker); log.info( String.format( diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/RequestProfiler.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/RequestProfiler.java index 361f55d1dd..7e0e8531b3 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/RequestProfiler.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/RequestProfiler.java @@ -80,7 +80,7 @@ enum OperationName { private static final int MAX_CACHED_REQUEST = 100000; // Singleton for easier access. - static final RequestProfiler REQUEST_PROFILER_SINGLETON = new RequestProfiler(); + private static final RequestProfiler REQUEST_PROFILER_SINGLETON = new RequestProfiler(); // Tunable static variable indicate how many top longest latency requests we should consider. private static final int DEFAULT_TOP_K = 20; @@ -172,33 +172,35 @@ void flushAndPrintReport() { // Periodically trigger the report generation. void startPeriodicalReportFlushing() { this.enableProfiiler = true; - this.flushThread = - new Thread( - new Runnable() { - @Override - public void run() { - try { - while (true) { - try { - TimeUnit.MILLISECONDS.sleep(FLUSH_PERIOD.toMillis()); - } catch (InterruptedException e) { - log.warning("Flush report thread is interrupted by " + e.toString()); - throw new RuntimeException(e); + if (this.flushThread == null || !this.flushThread.isAlive()) { + this.flushThread = + new Thread( + new Runnable() { + @Override + public void run() { + try { + while (true) { + try { + TimeUnit.MILLISECONDS.sleep(FLUSH_PERIOD.toMillis()); + } catch (InterruptedException e) { + log.warning("Flush report thread is interrupted by " + e.toString()); + throw new RuntimeException(e); + } + flushAndPrintReport(); } - flushAndPrintReport(); + } catch (Exception ex) { + // Mute any exception thrown from profiler process as we don't want to + // interrupt normal operations. + log.warning( + "Exception thrown request profiler ignored, this is suggesting faulty " + + "implementation of " + + "RequestProfiler, exception context: " + + ex.toString()); } - } catch (Exception ex) { - // Mute any exception thrown from profiler process as we don't want to - // interrupt normal operations. - log.warning( - "Exception thrown request profiler ignored, this is suggesting faulty " - + "implementation of " - + "RequestProfiler, exception context: " - + ex.toString()); } - } - }); - this.flushThread.start(); + }); + this.flushThread.start(); + } } String flushAndGenerateReportText() { @@ -402,7 +404,48 @@ void internalDisableAndClearProfiler() { FLUSH_PERIOD = DEFAULT_FLUSH_PERIOD; } - public static void disableAndClearProfiler() { + public static void disableAndResetProfiler() { REQUEST_PROFILER_SINGLETON.internalDisableAndClearProfiler(); } + + /** + * A hook for easier access to request profiler. Otherwise we have to trigger tedious if clauses + * to check whether profiler is enabled before every caller's trigger of the request profiler. + * This is because profiler is shared statically across instances. + */ + static class RequestProfilerHook { + private boolean enableRequestProfiler = false; + + RequestProfilerHook(boolean enableRequestProfiler) { + this.enableRequestProfiler = enableRequestProfiler; + } + + // Mimic the api exposed by the main request profiler. + void startOperation(OperationName operationName, String requestUniqueId) { + if (this.enableRequestProfiler) { + RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation(operationName, requestUniqueId); + } + } + + // Mimic the api exposed by the main request profiler. + void endOperation(OperationName operationName, String requestUniqueId) { + if (this.enableRequestProfiler) { + RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation(operationName, requestUniqueId); + } + } + + void startPeriodicalReportFlushing() { + if (this.enableRequestProfiler) { + RequestProfiler.REQUEST_PROFILER_SINGLETON.startPeriodicalReportFlushing(); + } + } + + String flushAndGenerateReportText() { + return RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText(); + } + + void enableProfiler() { + REQUEST_PROFILER_SINGLETON.enableProfiler(); + } + } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java index 9db44b22b9..c021980df5 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java @@ -71,6 +71,9 @@ public class SchemaAwareStreamWriter implements AutoCloseable { // the user provides the table schema, we should always use that schema. private final boolean skipRefreshStreamWriter; + // Provide access to the request profiler. + private final RequestProfiler.RequestProfilerHook requestProfilerHook; + /** * Constructs the SchemaAwareStreamWriter * @@ -103,8 +106,10 @@ private SchemaAwareStreamWriter(Builder builder) streamWriterBuilder.setDefaultMissingValueInterpretation( builder.defaultMissingValueInterpretation); streamWriterBuilder.setClientId(builder.clientId); + streamWriterBuilder.setEnableLatencyProfiler(builder.enableRequestProfiler); + requestProfilerHook = new RequestProfiler.RequestProfilerHook(builder.enableRequestProfiler); if (builder.enableRequestProfiler) { - streamWriterBuilder.setEnableLatencyProfiler(builder.enableRequestProfiler); + requestProfilerHook.startPeriodicalReportFlushing(); } this.streamWriter = streamWriterBuilder.build(); this.streamName = builder.streamName; @@ -127,12 +132,12 @@ private SchemaAwareStreamWriter(Builder builder) public ApiFuture append(Iterable items) throws IOException, DescriptorValidationException { String requestUniqueId = generateRequestUniqueId(); - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( + requestProfilerHook.startOperation( RequestProfiler.OperationName.TOTAL_LATENCY, requestUniqueId); try { return appendWithUniqueId(items, -1, requestUniqueId); } catch (Exception ex) { - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + requestProfilerHook.endOperation( RequestProfiler.OperationName.TOTAL_LATENCY, requestUniqueId); throw ex; } @@ -197,12 +202,12 @@ private List buildMessage(Iterable items) public ApiFuture append(Iterable items, long offset) throws IOException, DescriptorValidationException { String requestUniqueId = generateRequestUniqueId(); - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( + requestProfilerHook.startOperation( RequestProfiler.OperationName.TOTAL_LATENCY, requestUniqueId); try { return appendWithUniqueId(items, offset, requestUniqueId); } catch (Exception ex) { - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + requestProfilerHook.endOperation( RequestProfiler.OperationName.TOTAL_LATENCY, requestUniqueId); throw ex; } @@ -213,7 +218,7 @@ ApiFuture appendWithUniqueId( throws DescriptorValidationException, IOException { // Handle schema updates in a Thread-safe way by locking down the operation synchronized (this) { - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( + requestProfilerHook.startOperation( RequestProfiler.OperationName.JSON_TO_PROTO_CONVERSION, requestUniqueId); ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); try { @@ -246,7 +251,7 @@ ApiFuture appendWithUniqueId( rowIndexToErrorMessage); } } finally { - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + requestProfilerHook.endOperation( RequestProfiler.OperationName.JSON_TO_PROTO_CONVERSION, requestUniqueId); } return this.streamWriter.appendWithUniqueId(rowsBuilder.build(), offset, requestUniqueId); @@ -529,9 +534,6 @@ private Builder( this.skipRefreshStreamWriter = true; } this.toProtoConverter = toProtoConverter; - if (this.enableRequestProfiler) { - RequestProfiler.REQUEST_PROFILER_SINGLETON.startPeriodicalReportFlushing(); - } } /** diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 8b3732f0df..5b73b3e918 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -122,6 +122,9 @@ public class StreamWriter implements AutoCloseable { /** Creation timestamp of this streamwriter */ private final long creationTimestamp; + /** Provide access to the request profiler tool. */ + private final RequestProfiler.RequestProfilerHook requestProfilerHook; + private Lock lock; /** The maximum size of one request. Defined by the API. */ @@ -227,11 +230,13 @@ private StreamWriter(Builder builder) throws IOException { this.writerSchema = builder.writerSchema; this.defaultMissingValueInterpretation = builder.defaultMissingValueInterpretation; BigQueryWriteSettings clientSettings = getBigQueryWriteSettings(builder); + this.requestProfilerHook = + new RequestProfiler.RequestProfilerHook(builder.enableRequestProfiler); if (builder.enableRequestProfiler) { - // Request profiler is enabled on singleton level, from now on a periodical flush will happen - // to generate - // detailed latency reports for requests latency. - RequestProfiler.REQUEST_PROFILER_SINGLETON.startPeriodicalReportFlushing(); + // Request profiler is enabled on singleton level, from now on a periodical flush will be + // started + // to generate detailed latency reports for requests latency. + requestProfilerHook.startPeriodicalReportFlushing(); } if (!builder.enableConnectionPool) { this.location = builder.location; @@ -248,7 +253,8 @@ private StreamWriter(Builder builder) throws IOException { builder.getFullTraceId(), builder.compressorName, clientSettings, - builder.retrySettings)); + builder.retrySettings, + builder.enableRequestProfiler)); } else { if (!isDefaultStream(streamName)) { log.warning( @@ -314,7 +320,8 @@ private StreamWriter(Builder builder) throws IOException { builder.getFullTraceId(), builder.compressorName, client.getSettings(), - builder.retrySettings); + builder.retrySettings, + builder.enableRequestProfiler); })); validateFetchedConnectonPool(builder); // If the client is not from outside, then shutdown the client we created. @@ -461,12 +468,12 @@ public ApiFuture append(ProtoRows rows) { */ public ApiFuture append(ProtoRows rows, long offset) { String requestUniqueId = generateRequestUniqueId(); - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( + requestProfilerHook.startOperation( RequestProfiler.OperationName.TOTAL_LATENCY, requestUniqueId); try { return appendWithUniqueId(rows, offset, requestUniqueId); } catch (Exception ex) { - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + requestProfilerHook.endOperation( RequestProfiler.OperationName.TOTAL_LATENCY, requestUniqueId); throw ex; } @@ -487,7 +494,7 @@ ApiFuture appendWithUniqueId( .withDescription("User closed StreamWriter"), streamName, getWriterId())); - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( + requestProfilerHook.endOperation( RequestProfiler.OperationName.TOTAL_LATENCY, requestUniqueId); return requestWrapper.appendResult; } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java index dd1b04d1fa..c628d9b23b 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -564,6 +564,7 @@ ConnectionWorkerPool createConnectionWorkerPool( TEST_TRACE_ID, null, clientSettings, - retrySettings); + retrySettings, + /*enableRequestProfiler=*/ false); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java index 1b6edf4043..6bd3d93b1a 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java @@ -344,7 +344,8 @@ public void testAppendButInflightQueueFull() throws Exception { TEST_TRACE_ID, null, client.getSettings(), - retrySettings); + retrySettings, + /*enableRequestProfiler=*/ false); testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1)); ConnectionWorker.setMaxInflightQueueWaitTime(500); @@ -401,7 +402,8 @@ public void testThrowExceptionWhileWithinAppendLoop() throws Exception { TEST_TRACE_ID, null, client.getSettings(), - retrySettings); + retrySettings, + /*enableRequestProfiler=*/ false); testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1)); ConnectionWorker.setMaxInflightQueueWaitTime(500); @@ -470,7 +472,8 @@ public void testLocationMismatch() throws Exception { TEST_TRACE_ID, null, client.getSettings(), - retrySettings); + retrySettings, + /*enableRequestProfiler=*/ false); StatusRuntimeException ex = assertThrows( StatusRuntimeException.class, @@ -502,7 +505,8 @@ public void testStreamNameMismatch() throws Exception { TEST_TRACE_ID, null, client.getSettings(), - retrySettings); + retrySettings, + /*enableRequestProfiler=*/ false); StatusRuntimeException ex = assertThrows( StatusRuntimeException.class, @@ -555,7 +559,8 @@ private ConnectionWorker createConnectionWorker( TEST_TRACE_ID, null, client.getSettings(), - retrySettings); + retrySettings, + /*enableRequestProfiler=*/ false); } private ProtoSchema createProtoSchema(String protoName) { @@ -651,7 +656,8 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E TEST_TRACE_ID, null, client.getSettings(), - retrySettings); + retrySettings, + /*enableRequestProfiler=*/ false); org.threeten.bp.Duration durationSleep = org.threeten.bp.Duration.ofSeconds(2); testBigQueryWrite.setResponseSleep(durationSleep); @@ -726,7 +732,8 @@ public void testLongTimeIdleWontFail() throws Exception { TEST_TRACE_ID, null, client.getSettings(), - retrySettings); + retrySettings, + /*enableRequestProfiler=*/ false); long appendCount = 10; for (int i = 0; i < appendCount * 2; i++) { @@ -771,7 +778,8 @@ private void exerciseOpenTelemetryAttributesWithStreamNames(String streamName, S null, null, client.getSettings(), - retrySettings); + retrySettings, + /*enableRequestProfiler=*/ false); Attributes attributes = connectionWorker.getTelemetryAttributes(); String attributesTableId = attributes.get(ConnectionWorker.telemetryKeyTableId); @@ -811,7 +819,8 @@ void exerciseOpenTelemetryAttributesWithTraceId( traceId, null, client.getSettings(), - retrySettings); + retrySettings, + /*enableRequestProfiler=*/ false); Attributes attributes = connectionWorker.getTelemetryAttributes(); checkOpenTelemetryTraceIdAttribute(attributes, 0, expectedField1); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/RequestProfilerTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/RequestProfilerTest.java index 9587512b63..ce699941a3 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/RequestProfilerTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/RequestProfilerTest.java @@ -38,101 +38,85 @@ public class RequestProfilerTest { private static final Logger log = Logger.getLogger(RequestProfiler.class.getName()); + private RequestProfiler.RequestProfilerHook profilerHook = + new RequestProfiler.RequestProfilerHook(true); + @Before public void setup() { - RequestProfiler.REQUEST_PROFILER_SINGLETON.disableAndClearProfiler(); - RequestProfiler.REQUEST_PROFILER_SINGLETON.enableProfiler(); + RequestProfiler.disableAndResetProfiler(); + profilerHook.enableProfiler(); } @After - public void cleanup() { - RequestProfiler.REQUEST_PROFILER_SINGLETON.disableAndClearProfiler(); + public void close() { + RequestProfiler.disableAndResetProfiler(); } @Test public void testNormalCase() throws Exception { - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( - OperationName.TOTAL_LATENCY, "request_1"); - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( - OperationName.JSON_TO_PROTO_CONVERSION, "request_1"); - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( - OperationName.JSON_TO_PROTO_CONVERSION, "request_1"); - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( - OperationName.RESPONSE_LATENCY, "request_1"); + profilerHook.startOperation(OperationName.TOTAL_LATENCY, "request_1"); + profilerHook.startOperation(OperationName.JSON_TO_PROTO_CONVERSION, "request_1"); + profilerHook.endOperation(OperationName.JSON_TO_PROTO_CONVERSION, "request_1"); + profilerHook.startOperation(OperationName.RESPONSE_LATENCY, "request_1"); // Another request starts in the middle - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( - OperationName.TOTAL_LATENCY, "request_2"); - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( - OperationName.JSON_TO_PROTO_CONVERSION, "request_2"); - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( - OperationName.JSON_TO_PROTO_CONVERSION, "request_2"); + profilerHook.startOperation(OperationName.TOTAL_LATENCY, "request_2"); + profilerHook.startOperation(OperationName.JSON_TO_PROTO_CONVERSION, "request_2"); + profilerHook.endOperation(OperationName.JSON_TO_PROTO_CONVERSION, "request_2"); // Continue request 1 - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( - OperationName.RESPONSE_LATENCY, "request_1"); + profilerHook.endOperation(OperationName.RESPONSE_LATENCY, "request_1"); // Continue request 2 - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( - OperationName.RESPONSE_LATENCY, "request_2"); - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( - OperationName.RESPONSE_LATENCY, "request_2"); - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( - OperationName.TOTAL_LATENCY, "request_2"); + profilerHook.startOperation(OperationName.RESPONSE_LATENCY, "request_2"); + profilerHook.endOperation(OperationName.RESPONSE_LATENCY, "request_2"); + profilerHook.endOperation(OperationName.TOTAL_LATENCY, "request_2"); // Continue request 1 - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( - OperationName.TOTAL_LATENCY, "request_1"); + profilerHook.endOperation(OperationName.TOTAL_LATENCY, "request_1"); // Test the report generated. - String reportText = RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText(); + String reportText = profilerHook.flushAndGenerateReportText(); + log.info(reportText); assertTrue(reportText.contains("Request uuid: request_1 with total time")); assertTrue(reportText.contains("Operation name json_to_proto_conversion starts at")); assertTrue(reportText.contains("Operation name response_latency starts at")); assertTrue(reportText.contains("Request uuid: request_2 with total time")); // Second time flush is called, it should generate empty report. - reportText = RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText(); + reportText = profilerHook.flushAndGenerateReportText(); assertTrue(reportText.contains("0 requests finished during")); } @Test public void mixFinishedAndUnfinishedRequest() throws Exception { // Start request 1. - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( - OperationName.TOTAL_LATENCY, "request_1"); - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( - OperationName.JSON_TO_PROTO_CONVERSION, "request_1"); - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( - OperationName.JSON_TO_PROTO_CONVERSION, "request_1"); - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( - OperationName.RESPONSE_LATENCY, "request_1"); + profilerHook.startOperation(OperationName.TOTAL_LATENCY, "request_1"); + profilerHook.startOperation(OperationName.JSON_TO_PROTO_CONVERSION, "request_1"); + profilerHook.endOperation(OperationName.JSON_TO_PROTO_CONVERSION, "request_1"); + profilerHook.startOperation(OperationName.RESPONSE_LATENCY, "request_1"); // Another request starts in the middle - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( - OperationName.TOTAL_LATENCY, "request_2"); - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( - OperationName.JSON_TO_PROTO_CONVERSION, "request_2"); + profilerHook.startOperation(OperationName.TOTAL_LATENCY, "request_2"); + profilerHook.startOperation(OperationName.JSON_TO_PROTO_CONVERSION, "request_2"); // First report should be empty since no requests end. - String reportText = RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText(); + String reportText = profilerHook.flushAndGenerateReportText(); assertTrue(reportText.contains("0 requests finished during")); // End one of them. - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( - OperationName.TOTAL_LATENCY, "request_1"); - reportText = RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText(); + profilerHook.endOperation(OperationName.TOTAL_LATENCY, "request_1"); + reportText = profilerHook.flushAndGenerateReportText(); assertTrue(reportText.contains("Request uuid: request_1 with total time")); // End another, expect the first request's log not showing up. - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( - OperationName.TOTAL_LATENCY, "request_2"); - reportText = RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText(); + profilerHook.endOperation(OperationName.TOTAL_LATENCY, "request_2"); + reportText = profilerHook.flushAndGenerateReportText(); assertTrue(!reportText.contains("Request uuid: request_1 with total time")); assertTrue(reportText.contains("Request uuid: request_2 with total time")); // Flush again will be empty report. - reportText = RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText(); + reportText = profilerHook.flushAndGenerateReportText(); assertTrue(reportText.contains("0 requests finished during")); } @@ -153,10 +137,8 @@ public void concurrentProfilingTest_1000ReqsRunTogether() throws Exception { threadPool.submit( () -> { String uuid = String.format("request_%s", finalI); - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( - OperationName.TOTAL_LATENCY, uuid); - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( - OperationName.JSON_TO_PROTO_CONVERSION, uuid); + profilerHook.startOperation(OperationName.TOTAL_LATENCY, uuid); + profilerHook.startOperation(OperationName.JSON_TO_PROTO_CONVERSION, uuid); if (slowRequestIndex.contains(finalI)) { try { TimeUnit.MILLISECONDS.sleep(finalI * 100); @@ -164,18 +146,12 @@ public void concurrentProfilingTest_1000ReqsRunTogether() throws Exception { throw new RuntimeException(e); } } - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( - OperationName.JSON_TO_PROTO_CONVERSION, uuid); - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( - OperationName.JSON_TO_PROTO_CONVERSION, uuid); - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( - OperationName.JSON_TO_PROTO_CONVERSION, uuid); - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( - OperationName.WAIT_QUEUE, uuid); - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( - OperationName.WAIT_QUEUE, uuid); - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( - OperationName.TOTAL_LATENCY, uuid); + profilerHook.endOperation(OperationName.JSON_TO_PROTO_CONVERSION, uuid); + profilerHook.startOperation(OperationName.JSON_TO_PROTO_CONVERSION, uuid); + profilerHook.endOperation(OperationName.JSON_TO_PROTO_CONVERSION, uuid); + profilerHook.startOperation(OperationName.WAIT_QUEUE, uuid); + profilerHook.endOperation(OperationName.WAIT_QUEUE, uuid); + profilerHook.endOperation(OperationName.TOTAL_LATENCY, uuid); })); } @@ -183,7 +159,7 @@ public void concurrentProfilingTest_1000ReqsRunTogether() throws Exception { for (int i = 0; i < futures.size(); i++) { futures.get(i).get(); } - String reportText = RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText(); + String reportText = profilerHook.flushAndGenerateReportText(); assertTrue(reportText.contains("During the last 60000 milliseconds at system time")); assertTrue(reportText.contains("in total 1000 requests finished")); assertTrue(reportText.contains("Request uuid: request_50 with total time")); @@ -211,27 +187,18 @@ public void concurrentProfilingTest_RunWhileFlushing() throws Exception { () -> { try { String uuid = String.format("request_%s", finalI); - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( - OperationName.TOTAL_LATENCY, uuid); - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( - OperationName.JSON_TO_PROTO_CONVERSION, uuid); + profilerHook.startOperation(OperationName.TOTAL_LATENCY, uuid); + profilerHook.startOperation(OperationName.JSON_TO_PROTO_CONVERSION, uuid); if (slowRequestIndex.contains(finalI)) { TimeUnit.MILLISECONDS.sleep(finalI * 100); } - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( - OperationName.JSON_TO_PROTO_CONVERSION, uuid); - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( - OperationName.JSON_TO_PROTO_CONVERSION, uuid); - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( - OperationName.JSON_TO_PROTO_CONVERSION, uuid); - RequestProfiler.REQUEST_PROFILER_SINGLETON.startOperation( - OperationName.WAIT_QUEUE, uuid); - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( - OperationName.WAIT_QUEUE, uuid); - RequestProfiler.REQUEST_PROFILER_SINGLETON.endOperation( - OperationName.TOTAL_LATENCY, uuid); - String unused = - RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText(); + profilerHook.endOperation(OperationName.JSON_TO_PROTO_CONVERSION, uuid); + profilerHook.startOperation(OperationName.JSON_TO_PROTO_CONVERSION, uuid); + profilerHook.endOperation(OperationName.JSON_TO_PROTO_CONVERSION, uuid); + profilerHook.startOperation(OperationName.WAIT_QUEUE, uuid); + profilerHook.endOperation(OperationName.WAIT_QUEUE, uuid); + profilerHook.endOperation(OperationName.TOTAL_LATENCY, uuid); + String unused = profilerHook.flushAndGenerateReportText(); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -242,7 +209,7 @@ public void concurrentProfilingTest_RunWhileFlushing() throws Exception { for (int i = 0; i < futures.size(); i++) { futures.get(i).get(); } - String reportText = RequestProfiler.REQUEST_PROFILER_SINGLETON.flushAndGenerateReportText(); + String reportText = profilerHook.flushAndGenerateReportText(); assertTrue(reportText.contains("0 requests finished during")); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index 7ec713cf5e..9d29d4cfd2 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -571,7 +571,7 @@ public void testRequestProfilerWithCommittedStream() Assert.fail("Unexpected error " + ex); } } - RequestProfiler.disableAndClearProfiler(); + RequestProfiler.disableAndResetProfiler(); } @Test