diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java b/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java index 8de78460a..7a6b23670 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java @@ -5,25 +5,22 @@ package net.snowflake.ingest.streaming.internal; /** - * Class to manage blob path strings that might have an embedded security token if its a presigned - * url + * Class to maintain the upload-path (relative to the location for which we have authorized access) + * and the file registration path (relative to the volume location). + * + *

In the case of FDN tables, these values are identical as we get access to the account's + * streaming_ingest volume. + * + *

In the case of Iceberg tables, these values are different since we scope the token down to a + * per-session subpath under the external volume's location, whereas the file registration still + * needs to happen relative to the ext vol. */ -public class BlobPath { - public final String blobPath; - public final Boolean hasToken; - public final String fileName; +class BlobPath { + public final String uploadPath; + public final String fileRegistrationPath; - private BlobPath(String fileName, String blobPath, Boolean hasToken) { - this.blobPath = blobPath; - this.hasToken = hasToken; - this.fileName = fileName; - } - - public static BlobPath fileNameWithoutToken(String fileName) { - return new BlobPath(fileName, fileName, false); - } - - public static BlobPath presignedUrlWithToken(String fileName, String url) { - return new BlobPath(fileName, url, true); + public BlobPath(String uploadPath, String fileRegistrationPath) { + this.uploadPath = uploadPath; + this.fileRegistrationPath = fileRegistrationPath; } } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java index d11762340..9b3c7eda4 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/FlushService.java @@ -492,7 +492,7 @@ && shouldStopProcessing( long flushStartMs = System.currentTimeMillis(); if (this.owningClient.flushLatency != null) { latencyTimerContextMap.putIfAbsent( - blobPath.fileName, this.owningClient.flushLatency.time()); + blobPath.fileRegistrationPath, this.owningClient.flushLatency.time()); } Supplier supplier = @@ -510,7 +510,7 @@ && shouldStopProcessing( + " detail=%s, trace=%s, all channels in the blob will be" + " invalidated", this.owningClient.getName(), - blobPath.fileName, + blobPath.fileRegistrationPath, ex, ex.getMessage(), getStackTrace(ex)); @@ -540,7 +540,7 @@ && shouldStopProcessing( blobs.add( new Pair<>( - new BlobData<>(blobPath.fileName, blobData), + new BlobData<>(blobPath.fileRegistrationPath, blobData), CompletableFuture.supplyAsync(supplier, this.buildUploadWorkers))); logger.logInfo( @@ -600,7 +600,7 @@ BlobMetadata buildAndUpload( // Construct the blob along with the metadata of the blob BlobBuilder.Blob blob = BlobBuilder.constructBlobAndMetadata( - blobPath.fileName, + blobPath.fileRegistrationPath, blobData, bdecVersion, this.owningClient.getInternalParameterProvider()); @@ -632,7 +632,7 @@ BlobMetadata upload( List metadata, BlobStats blobStats) throws NoSuchAlgorithmException { - logger.logInfo("Start uploading blob={}, size={}", blobPath.fileName, blob.length); + logger.logInfo("Start uploading blob={}, size={}", blobPath.fileRegistrationPath, blob.length); long startTime = System.currentTimeMillis(); Timer.Context uploadContext = Utils.createTimerContext(this.owningClient.uploadLatency); @@ -648,14 +648,14 @@ BlobMetadata upload( logger.logInfo( "Finish uploading blob={}, size={}, timeInMillis={}", - blobPath.fileName, + blobPath.fileRegistrationPath, blob.length, System.currentTimeMillis() - startTime); // at this point we know for sure if the BDEC file has data for more than one chunk, i.e. // spans mixed tables or not return BlobMetadata.createBlobMetadata( - blobPath.fileName, + blobPath.fileRegistrationPath, BlobBuilder.computeMD5(blob), bdecVersion, metadata, diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java index edd92f939..9a173365b 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/IStorageManager.java @@ -4,7 +4,10 @@ package net.snowflake.ingest.streaming.internal; -/** Interface to manage {@link InternalStage} and {@link ExternalVolume} for {@link FlushService} */ +/** + * Interface to manage {@link InternalStage} and {@link PresignedUrlExternalVolume} for {@link + * FlushService} + */ interface IStorageManager { /** Default max upload retries for streaming ingest storage */ diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java index 984201555..adf7a0256 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStage.java @@ -301,12 +301,11 @@ SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName) /** Upload file to internal stage */ public void put(BlobPath blobPath, byte[] blob) { - String filePath = blobPath.fileName; if (this.isLocalFS()) { - putLocal(this.fileTransferMetadataWithAge.localLocation, filePath, blob); + putLocal(this.fileTransferMetadataWithAge.localLocation, blobPath.fileRegistrationPath, blob); } else { try { - putRemote(filePath, blob, 0); + putRemote(blobPath.uploadPath, blob, 0); } catch (SnowflakeSQLException | IOException e) { throw new SFException(e, ErrorCode.BLOB_UPLOAD_FAILURE); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java index 14ca18822..aa770c92f 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/InternalStageManager.java @@ -153,7 +153,8 @@ public BlobPath generateBlobPath(String fullyQualifiedTableName) { // other implementation // of IStorageManager does end up using this argument. Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); - return BlobPath.fileNameWithoutToken(getNextFileName(calendar, this.clientPrefix)); + String fileName = getNextFileName(calendar, this.clientPrefix); + return new BlobPath(fileName /* uploadPath */, fileName /* fileRegistrationPath */); } /** For TESTING */ diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java b/src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolume.java similarity index 98% rename from src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java rename to src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolume.java index 4f3607446..ed8e2891f 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolume.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolume.java @@ -36,9 +36,9 @@ import net.snowflake.ingest.utils.SFException; /** Handles uploading files to the Iceberg Table's external volume's table data path */ -class ExternalVolume implements IStorage { +class PresignedUrlExternalVolume implements IStorage { // TODO everywhere: static final should be named in all capitals - private static final Logging logger = new Logging(ExternalVolume.class); + private static final Logging logger = new Logging(PresignedUrlExternalVolume.class); private static final int DEFAULT_PRESIGNED_URL_COUNT = 10; private static final int DEFAULT_PRESIGNED_URL_TIMEOUT_IN_SECONDS = 900; @@ -74,7 +74,7 @@ class ExternalVolume implements IStorage { private final FileLocationInfo locationInfo; private final SnowflakeFileTransferMetadataWithAge fileTransferMetadata; - ExternalVolume( + PresignedUrlExternalVolume( String clientName, String clientPrefix, Long deploymentId, @@ -123,12 +123,13 @@ class ExternalVolume implements IStorage { @Override public void put(BlobPath blobPath, byte[] blob) { if (this.fileTransferMetadata.isLocalFS) { - InternalStage.putLocal(this.fileTransferMetadata.localLocation, blobPath.fileName, blob); + InternalStage.putLocal( + this.fileTransferMetadata.localLocation, blobPath.fileRegistrationPath, blob); return; } try { - putRemote(blobPath.blobPath, blob); + putRemote(blobPath.uploadPath, blob); } catch (Throwable e) { throw new SFException(e, ErrorCode.BLOB_UPLOAD_FAILURE); } diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java b/src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolumeManager.java similarity index 84% rename from src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java rename to src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolumeManager.java index 556d02b9b..4147430bc 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManager.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/PresignedUrlExternalVolumeManager.java @@ -15,11 +15,11 @@ import net.snowflake.ingest.utils.SFException; /** Class to manage multiple external volumes */ -class ExternalVolumeManager implements IStorageManager { +class PresignedUrlExternalVolumeManager implements IStorageManager { // TODO: Rename all logger members to LOGGER and checkin code formatting rules - private static final Logging logger = new Logging(ExternalVolumeManager.class); + private static final Logging logger = new Logging(PresignedUrlExternalVolumeManager.class); // Reference to the external volume per table - private final Map externalVolumeMap; + private final Map externalVolumeMap; // name of the owning client private final String clientName; @@ -48,7 +48,7 @@ class ExternalVolumeManager implements IStorageManager { * @param clientName the name of the client * @param snowflakeServiceClient the Snowflake service client used for configure calls */ - ExternalVolumeManager( + PresignedUrlExternalVolumeManager( boolean isTestMode, String role, String clientName, @@ -66,7 +66,7 @@ class ExternalVolumeManager implements IStorageManager { throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage()); } logger.logDebug( - "Created ExternalVolumeManager with clientName=%s and clientPrefix=%s", + "Created PresignedUrlExternalVolumeManager with clientName=%s and clientPrefix=%s", clientName, clientPrefix); } @@ -77,7 +77,7 @@ class ExternalVolumeManager implements IStorageManager { * @return target storage */ @Override - public ExternalVolume getStorage(String fullyQualifiedTableName) { + public PresignedUrlExternalVolume getStorage(String fullyQualifiedTableName) { // Only one chunk per blob in Iceberg mode. return getVolumeSafe(fullyQualifiedTableName); } @@ -104,8 +104,8 @@ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) { } try { - ExternalVolume externalVolume = - new ExternalVolume( + PresignedUrlExternalVolume externalVolume = + new PresignedUrlExternalVolume( clientName, getClientPrefix(), deploymentId, @@ -132,9 +132,9 @@ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) { @Override public BlobPath generateBlobPath(String fullyQualifiedTableName) { - ExternalVolume volume = getVolumeSafe(fullyQualifiedTableName); + PresignedUrlExternalVolume volume = getVolumeSafe(fullyQualifiedTableName); PresignedUrlInfo urlInfo = volume.dequeueUrlInfo(); - return BlobPath.presignedUrlWithToken(urlInfo.fileName, urlInfo.url); + return new BlobPath(urlInfo.url /* uploadPath */, urlInfo.fileName /* fileRegistrationPath */); } /** @@ -147,8 +147,8 @@ public String getClientPrefix() { return this.clientPrefix; } - private ExternalVolume getVolumeSafe(String fullyQualifiedTableName) { - ExternalVolume volume = this.externalVolumeMap.get(fullyQualifiedTableName); + private PresignedUrlExternalVolume getVolumeSafe(String fullyQualifiedTableName) { + PresignedUrlExternalVolume volume = this.externalVolumeMap.get(fullyQualifiedTableName); if (volume == null) { throw new SFException( diff --git a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java index 7751499f1..f553fb7ff 100644 --- a/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java +++ b/src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestClientInternal.java @@ -238,7 +238,7 @@ public class SnowflakeStreamingIngestClientInternal implements SnowflakeStrea this.storageManager = isIcebergMode - ? new ExternalVolumeManager( + ? new PresignedUrlExternalVolumeManager( isTestMode, this.role, this.name, this.snowflakeServiceClient) : new InternalStageManager( isTestMode, this.role, this.name, this.snowflakeServiceClient); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java index 2a1a3d97b..7b8ba7605 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/FlushServiceTest.java @@ -100,7 +100,7 @@ private abstract static class TestContext implements AutoCloseable { FlushService flushService; IStorageManager storageManager; InternalStage storage; - ExternalVolume extVolume; + PresignedUrlExternalVolume extVolume; ParameterProvider parameterProvider; RegisterService registerService; @@ -108,7 +108,7 @@ private abstract static class TestContext implements AutoCloseable { TestContext() { storage = Mockito.mock(InternalStage.class); - extVolume = Mockito.mock(ExternalVolume.class); + extVolume = Mockito.mock(PresignedUrlExternalVolume.class); parameterProvider = new ParameterProvider(isIcebergMode); InternalParameterProvider internalParameterProvider = new InternalParameterProvider(isIcebergMode); @@ -118,7 +118,7 @@ private abstract static class TestContext implements AutoCloseable { storageManager = Mockito.spy( isIcebergMode - ? new ExternalVolumeManager( + ? new PresignedUrlExternalVolumeManager( true, "role", "client", MockSnowflakeServiceClient.create()) : new InternalStageManager(true, "role", "client", null)); Mockito.doReturn(isIcebergMode ? extVolume : storage) @@ -148,7 +148,7 @@ ChannelData flushChannel(String name) { BlobMetadata buildAndUpload() throws Exception { List>> blobData = Collections.singletonList(channelData); return flushService.buildAndUpload( - BlobPath.fileNameWithoutToken("file_name"), + new BlobPath("file_name" /* uploadPath */, "file_name" /* fileRegistrationPath */), blobData, blobData.get(0).get(0).getChannelContext().getFullyQualifiedTableName()); } @@ -966,7 +966,7 @@ public void testBuildAndUpload() throws Exception { blobCaptor.capture(), metadataCaptor.capture(), ArgumentMatchers.any()); - Assert.assertEquals("file_name", nameCaptor.getValue().fileName); + Assert.assertEquals("file_name", nameCaptor.getValue().fileRegistrationPath); ChunkMetadata metadataResult = metadataCaptor.getValue().get(0); List channelMetadataResult = metadataResult.getChannels(); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java index e6335de0f..0cfadb92e 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/InternalStageTest.java @@ -149,7 +149,9 @@ public void testPutRemote() throws Exception { final ArgumentCaptor captor = ArgumentCaptor.forClass(SnowflakeFileTransferConfig.class); - stage.put(BlobPath.fileNameWithoutToken("test/path"), dataBytes); + stage.put( + new BlobPath("test/path" /* uploadPath */, "test/path" /* fileRegistrationPath */), + dataBytes); PowerMockito.verifyStatic(SnowflakeFileTransferAgent.class); SnowflakeFileTransferAgent.uploadWithoutConnection(captor.capture()); SnowflakeFileTransferConfig capturedConfig = captor.getValue(); @@ -186,7 +188,8 @@ public void testPutLocal() throws Exception { 1)); Mockito.doReturn(true).when(stage).isLocalFS(); - stage.put(BlobPath.fileNameWithoutToken(fileName), dataBytes); + stage.put( + new BlobPath(fileName /* uploadPath */, fileName /* fileRegistrationPath */), dataBytes); Path outputPath = Paths.get(fullFilePath, fileName); List output = Files.readAllLines(outputPath); Assert.assertEquals(1, output.size()); @@ -223,7 +226,9 @@ public void doTestPutRemoteRefreshes() throws Exception { ArgumentCaptor.forClass(SnowflakeFileTransferConfig.class); try { - stage.put(BlobPath.fileNameWithoutToken("test/path"), dataBytes); + stage.put( + new BlobPath("test/path" /* uploadPath */, "test/path" /* fileRegistrationPath */), + dataBytes); Assert.fail("Should not succeed"); } catch (SFException ex) { // Expected behavior given mocked response @@ -272,7 +277,9 @@ public void testPutRemoteGCS() throws Exception { SnowflakeFileTransferMetadataV1 metaMock = Mockito.mock(SnowflakeFileTransferMetadataV1.class); Mockito.doReturn(metaMock).when(stage).fetchSignedURL(Mockito.any()); - stage.put(BlobPath.fileNameWithoutToken("test/path"), dataBytes); + stage.put( + new BlobPath("test/path" /* uploadPath */, "test/path" /* fileRegistrationPath */), + dataBytes); SnowflakeFileTransferAgent.uploadWithoutConnection(Mockito.any()); Mockito.verify(stage, times(1)).fetchSignedURL("test/path"); } @@ -593,7 +600,9 @@ public Object answer(org.mockito.invocation.InvocationOnMock invocation) final ArgumentCaptor captor = ArgumentCaptor.forClass(SnowflakeFileTransferConfig.class); - stage.put(BlobPath.fileNameWithoutToken("test/path"), dataBytes); + stage.put( + new BlobPath("test/path" /* uploadPath */, "test/path" /* fileRegistrationPath */), + dataBytes); PowerMockito.verifyStatic(SnowflakeFileTransferAgent.class, times(maxUploadRetryCount)); SnowflakeFileTransferAgent.uploadWithoutConnection(captor.capture()); diff --git a/src/test/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManagerTest.java b/src/test/java/net/snowflake/ingest/streaming/internal/PresignedUrlPresignedUrlExternalVolumeManagerTest.java similarity index 89% rename from src/test/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManagerTest.java rename to src/test/java/net/snowflake/ingest/streaming/internal/PresignedUrlPresignedUrlExternalVolumeManagerTest.java index 3d650d1f8..afc647c54 100644 --- a/src/test/java/net/snowflake/ingest/streaming/internal/ExternalVolumeManagerTest.java +++ b/src/test/java/net/snowflake/ingest/streaming/internal/PresignedUrlPresignedUrlExternalVolumeManagerTest.java @@ -24,16 +24,16 @@ import org.junit.Before; import org.junit.Test; -public class ExternalVolumeManagerTest { +public class PresignedUrlPresignedUrlExternalVolumeManagerTest { private static final ObjectMapper objectMapper = new ObjectMapper(); - private ExternalVolumeManager manager; + private PresignedUrlExternalVolumeManager manager; private FileLocationInfo fileLocationInfo; private ExecutorService executorService; @Before public void setup() throws JsonProcessingException { this.manager = - new ExternalVolumeManager( + new PresignedUrlExternalVolumeManager( false /* isTestMode */, "role", "clientName", MockSnowflakeServiceClient.create()); Map fileLocationInfoMap = MockSnowflakeServiceClient.getStageLocationMap(); @@ -66,13 +66,13 @@ public void testRegister() { public void testConcurrentRegisterTable() throws Exception { int numThreads = 50; int timeoutInSeconds = 30; - List> allResults = + List> allResults = doConcurrentTest( numThreads, timeoutInSeconds, () -> manager.registerTable(new TableRef("db", "schema", "table"), fileLocationInfo), () -> manager.getStorage("db.schema.table")); - ExternalVolume extvol = manager.getStorage("db.schema.table"); + PresignedUrlExternalVolume extvol = manager.getStorage("db.schema.table"); assertNotNull(extvol); for (int i = 0; i < numThreads; i++) { assertSame("" + i, extvol, allResults.get(i).get(timeoutInSeconds, TimeUnit.SECONDS)); @@ -82,7 +82,7 @@ public void testConcurrentRegisterTable() throws Exception { @Test public void testGetStorage() { this.manager.registerTable(new TableRef("db", "schema", "table"), fileLocationInfo); - ExternalVolume extvol = this.manager.getStorage("db.schema.table"); + PresignedUrlExternalVolume extvol = this.manager.getStorage("db.schema.table"); assertNotNull(extvol); } @@ -105,9 +105,8 @@ public void testGenerateBlobPath() { manager.registerTable(new TableRef("db", "schema", "table"), fileLocationInfo); BlobPath blobPath = manager.generateBlobPath("db.schema.table"); assertNotNull(blobPath); - assertTrue(blobPath.hasToken); - assertEquals(blobPath.fileName, "f1"); - assertEquals(blobPath.blobPath, "http://f1.com?token=t1"); + assertEquals(blobPath.fileRegistrationPath, "f1"); + assertEquals(blobPath.uploadPath, "http://f1.com?token=t1"); } @Test @@ -129,8 +128,7 @@ public void testConcurrentGenerateBlobPath() throws Exception { for (int i = 0; i < numThreads; i++) { BlobPath blobPath = allResults.get(0).get(timeoutInSeconds, TimeUnit.SECONDS); assertNotNull(blobPath); - assertTrue(blobPath.hasToken); - assertTrue(blobPath.blobPath, blobPath.blobPath.contains("http://f1.com?token=t")); + assertTrue(blobPath.uploadPath, blobPath.uploadPath.contains("http://f1.com?token=t")); } }