Skip to content

Commit

Permalink
Get test coverage for telemetry reporting (was missing) + Fix Snowfla…
Browse files Browse the repository at this point in the history
…keServiceClient test code to allow per-API overrides (#839)

As part of bringing in ExternalVolume (presigned url based uploads), I decided to not add test-only code into ExtVol / ExtVolManager. This meant the following:

All the httpClient mock logic to return hardcoded responses now needs to handle multiple APIs (client configure AND getPresignedUrls) - this looked very similar to MockSnowflakeServiceClient logic but duplicated 10 times.
To fix this, introduced a per-API behavior override mechanism in MockSNowflakeServiceClient; helped me remove hundreds of lines of code that was just setting up the httpclient mock, and reduce how brittle that setup is
To do away with isTestMode in InternalStage / InternalStageManager, also did the following two changes:
Make the snowflake service client respond to telemetry upload calls correctly when isTestMode is false
Initialize the telemetry client even when isTestMode is false
Tested all affected unit test classes locally
  • Loading branch information
sfc-gh-hmadan authored Sep 23, 2024
1 parent e00831f commit 0919151
Show file tree
Hide file tree
Showing 9 changed files with 337 additions and 492 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea
String.format("%s_%s", this.name, System.currentTimeMillis()));

logger.logInfo("Using {} for authorization", this.requestBuilder.getAuthType());
}

if (this.requestBuilder != null) {
// Setup client telemetries if needed
this.setupMetricsForClient();
}
Expand Down
4 changes: 3 additions & 1 deletion src/test/java/net/snowflake/ingest/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import java.security.KeyFactory;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.NoSuchAlgorithmException;
import java.security.PrivateKey;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.PKCS8EncodedKeySpec;
import java.sql.Connection;
import java.sql.DriverManager;
Expand Down Expand Up @@ -102,7 +104,7 @@ public class TestUtils {
*
* @throws IOException if can't read profile
*/
private static void init() throws Exception {
private static void init() throws NoSuchAlgorithmException, InvalidKeySpecException, IOException {
String testProfilePath = getTestProfilePath();
Path path = Paths.get(testProfilePath);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

import static java.time.ZoneOffset.UTC;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -39,7 +42,12 @@ public static Object[] isIcebergMode() {
@Before
public void setup() {
cache = new ChannelCache<>();
client = new SnowflakeStreamingIngestClientInternal<>("client", isIcebergMode);
CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient();
RequestBuilder requestBuilder = MockSnowflakeServiceClient.createRequestBuilder(httpClient);
client =
new SnowflakeStreamingIngestClientInternal<>(
"client", null, null, httpClient, isIcebergMode, true, requestBuilder, new HashMap<>());

channel1 =
new SnowflakeStreamingIngestChannelInternal<>(
"channel1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,15 @@ private abstract static class TestContext<T> implements AutoCloseable {
FlushService<T> flushService;
IStorageManager storageManager;
InternalStage storage;
ExternalVolume extVolume;
ParameterProvider parameterProvider;
RegisterService registerService;

final List<ChannelData<T>> channelData = new ArrayList<>();

TestContext() {
storage = Mockito.mock(InternalStage.class);
extVolume = Mockito.mock(ExternalVolume.class);
parameterProvider = new ParameterProvider(isIcebergMode);
InternalParameterProvider internalParameterProvider =
new InternalParameterProvider(isIcebergMode);
Expand All @@ -113,9 +115,12 @@ private abstract static class TestContext<T> implements AutoCloseable {
storageManager =
Mockito.spy(
isIcebergMode
? new ExternalVolumeManager(true, "role", "client", null)
? new ExternalVolumeManager(
true, "role", "client", MockSnowflakeServiceClient.create())
: new InternalStageManager(true, "role", "client", null));
Mockito.doReturn(storage).when(storageManager).getStorage(ArgumentMatchers.any());
Mockito.doReturn(isIcebergMode ? extVolume : storage)
.when(storageManager)
.getStorage(ArgumentMatchers.any());
Mockito.when(storageManager.getClientPrefix()).thenReturn("client_prefix");
Mockito.when(client.getParameterProvider())
.thenAnswer((Answer<ParameterProvider>) (i) -> parameterProvider);
Expand Down Expand Up @@ -425,6 +430,7 @@ private static ColumnMetadata createLargeTestTextColumn(String name) {

@Test
public void testGetFilePath() {
// SNOW-1490151 Iceberg testing gaps
if (isIcebergMode) {
// TODO: SNOW-1502887 Blob path generation for iceberg table
return;
Expand Down Expand Up @@ -623,6 +629,7 @@ public void testBlobCreation() throws Exception {
FlushService<?> flushService = testContext.flushService;

// Force = true flushes
// SNOW-1490151 Iceberg testing gaps
if (!isIcebergMode) {
flushService.flush(true).get();
Mockito.verify(flushService, Mockito.atLeast(2))
Expand Down Expand Up @@ -674,6 +681,7 @@ public void testBlobSplitDueToDifferentSchema() throws Exception {

FlushService<?> flushService = testContext.flushService;

// SNOW-1490151 Iceberg testing gaps
if (!isIcebergMode) {
// Force = true flushes
flushService.flush(true).get();
Expand Down Expand Up @@ -711,6 +719,7 @@ public void testBlobSplitDueToChunkSizeLimit() throws Exception {

FlushService<?> flushService = testContext.flushService;

// SNOW-1490151 Iceberg testing gaps
if (!isIcebergMode) {
// Force = true flushes
flushService.flush(true).get();
Expand All @@ -721,6 +730,7 @@ public void testBlobSplitDueToChunkSizeLimit() throws Exception {

@Test
public void testBlobSplitDueToNumberOfChunks() throws Exception {
// SNOW-1490151 Iceberg testing gaps
if (isIcebergMode) {
return;
}
Expand Down Expand Up @@ -799,6 +809,7 @@ public void testBlobSplitDueToNumberOfChunksWithLeftoverChannels() throws Except
channel3.setupSchema(Collections.singletonList(createLargeTestTextColumn("C1")));
channel3.insertRow(Collections.singletonMap("C1", 0), "");

// SNOW-1490151 Iceberg testing gaps
if (isIcebergMode) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.utils.Utils;
Expand Down Expand Up @@ -50,9 +52,19 @@ public static Object[] isIcebergMode() {
@Setup(Level.Trial)
public void setUpBeforeAll() {
// SNOW-1490151: Testing gaps
CloseableHttpClient httpClient = MockSnowflakeServiceClient.createHttpClient();
RequestBuilder requestBuilder = MockSnowflakeServiceClient.createRequestBuilder(httpClient);
client =
new SnowflakeStreamingIngestClientInternal<ParquetChunkData>(
"client_PARQUET", isIcebergMode);
new SnowflakeStreamingIngestClientInternal<>(
"client_PARQUET",
null,
null,
httpClient,
isIcebergMode,
true,
requestBuilder,
new HashMap<>());

channel =
new SnowflakeStreamingIngestChannelInternal<>(
"channel",
Expand Down
Loading

0 comments on commit 0919151

Please sign in to comment.