Skip to content

Commit

Permalink
SNOW-1754295 - Prep for upcoming subscoped tokens PR (#867)
Browse files Browse the repository at this point in the history
1. Rename ExternalVolumeManager --> PresignedUrlExternalVolumeManager
2. Rename ExternalVolume --> PresignedUrlExternalVolume
3. Change BlobPath field names from filename/url (which were still pretty ambiguous) to fileRegistrationPath / uploadPath.

This PR does not introduce any logical changes, and is preparing the brach for the next (stacked) PR that actually brings in a subscoped token based external volume manager.
  • Loading branch information
sfc-gh-hmadan authored Oct 21, 2024
1 parent ee48554 commit 513b589
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 69 deletions.
33 changes: 15 additions & 18 deletions src/main/java/net/snowflake/ingest/streaming/internal/BlobPath.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,22 @@
package net.snowflake.ingest.streaming.internal;

/**
* Class to manage blob path strings that might have an embedded security token if its a presigned
* url
* Class to maintain the upload-path (relative to the location for which we have authorized access)
* and the file registration path (relative to the volume location).
*
* <p>In the case of FDN tables, these values are identical as we get access to the account's
* streaming_ingest volume.
*
* <p>In the case of Iceberg tables, these values are different since we scope the token down to a
* per-session subpath under the external volume's location, whereas the file registration still
* needs to happen relative to the ext vol.
*/
public class BlobPath {
public final String blobPath;
public final Boolean hasToken;
public final String fileName;
class BlobPath {
public final String uploadPath;
public final String fileRegistrationPath;

private BlobPath(String fileName, String blobPath, Boolean hasToken) {
this.blobPath = blobPath;
this.hasToken = hasToken;
this.fileName = fileName;
}

public static BlobPath fileNameWithoutToken(String fileName) {
return new BlobPath(fileName, fileName, false);
}

public static BlobPath presignedUrlWithToken(String fileName, String url) {
return new BlobPath(fileName, url, true);
public BlobPath(String uploadPath, String fileRegistrationPath) {
this.uploadPath = uploadPath;
this.fileRegistrationPath = fileRegistrationPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ && shouldStopProcessing(
long flushStartMs = System.currentTimeMillis();
if (this.owningClient.flushLatency != null) {
latencyTimerContextMap.putIfAbsent(
blobPath.fileName, this.owningClient.flushLatency.time());
blobPath.fileRegistrationPath, this.owningClient.flushLatency.time());
}

Supplier<BlobMetadata> supplier =
Expand All @@ -510,7 +510,7 @@ && shouldStopProcessing(
+ " detail=%s, trace=%s, all channels in the blob will be"
+ " invalidated",
this.owningClient.getName(),
blobPath.fileName,
blobPath.fileRegistrationPath,
ex,
ex.getMessage(),
getStackTrace(ex));
Expand Down Expand Up @@ -540,7 +540,7 @@ && shouldStopProcessing(

blobs.add(
new Pair<>(
new BlobData<>(blobPath.fileName, blobData),
new BlobData<>(blobPath.fileRegistrationPath, blobData),
CompletableFuture.supplyAsync(supplier, this.buildUploadWorkers)));

logger.logInfo(
Expand Down Expand Up @@ -600,7 +600,7 @@ BlobMetadata buildAndUpload(
// Construct the blob along with the metadata of the blob
BlobBuilder.Blob blob =
BlobBuilder.constructBlobAndMetadata(
blobPath.fileName,
blobPath.fileRegistrationPath,
blobData,
bdecVersion,
this.owningClient.getInternalParameterProvider());
Expand Down Expand Up @@ -632,7 +632,7 @@ BlobMetadata upload(
List<ChunkMetadata> metadata,
BlobStats blobStats)
throws NoSuchAlgorithmException {
logger.logInfo("Start uploading blob={}, size={}", blobPath.fileName, blob.length);
logger.logInfo("Start uploading blob={}, size={}", blobPath.fileRegistrationPath, blob.length);
long startTime = System.currentTimeMillis();

Timer.Context uploadContext = Utils.createTimerContext(this.owningClient.uploadLatency);
Expand All @@ -648,14 +648,14 @@ BlobMetadata upload(

logger.logInfo(
"Finish uploading blob={}, size={}, timeInMillis={}",
blobPath.fileName,
blobPath.fileRegistrationPath,
blob.length,
System.currentTimeMillis() - startTime);

// at this point we know for sure if the BDEC file has data for more than one chunk, i.e.
// spans mixed tables or not
return BlobMetadata.createBlobMetadata(
blobPath.fileName,
blobPath.fileRegistrationPath,
BlobBuilder.computeMD5(blob),
bdecVersion,
metadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package net.snowflake.ingest.streaming.internal;

/** Interface to manage {@link InternalStage} and {@link ExternalVolume} for {@link FlushService} */
/**
* Interface to manage {@link InternalStage} and {@link PresignedUrlExternalVolume} for {@link
* FlushService}
*/
interface IStorageManager {

/** Default max upload retries for streaming ingest storage */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,11 @@ SnowflakeFileTransferMetadataV1 fetchSignedURL(String fileName)

/** Upload file to internal stage */
public void put(BlobPath blobPath, byte[] blob) {
String filePath = blobPath.fileName;
if (this.isLocalFS()) {
putLocal(this.fileTransferMetadataWithAge.localLocation, filePath, blob);
putLocal(this.fileTransferMetadataWithAge.localLocation, blobPath.fileRegistrationPath, blob);
} else {
try {
putRemote(filePath, blob, 0);
putRemote(blobPath.uploadPath, blob, 0);
} catch (SnowflakeSQLException | IOException e) {
throw new SFException(e, ErrorCode.BLOB_UPLOAD_FAILURE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public BlobPath generateBlobPath(String fullyQualifiedTableName) {
// other implementation
// of IStorageManager does end up using this argument.
Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
return BlobPath.fileNameWithoutToken(getNextFileName(calendar, this.clientPrefix));
String fileName = getNextFileName(calendar, this.clientPrefix);
return new BlobPath(fileName /* uploadPath */, fileName /* fileRegistrationPath */);
}

/** For TESTING */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import net.snowflake.ingest.utils.SFException;

/** Handles uploading files to the Iceberg Table's external volume's table data path */
class ExternalVolume implements IStorage {
class PresignedUrlExternalVolume implements IStorage {
// TODO everywhere: static final should be named in all capitals
private static final Logging logger = new Logging(ExternalVolume.class);
private static final Logging logger = new Logging(PresignedUrlExternalVolume.class);
private static final int DEFAULT_PRESIGNED_URL_COUNT = 10;
private static final int DEFAULT_PRESIGNED_URL_TIMEOUT_IN_SECONDS = 900;

Expand Down Expand Up @@ -74,7 +74,7 @@ class ExternalVolume implements IStorage {
private final FileLocationInfo locationInfo;
private final SnowflakeFileTransferMetadataWithAge fileTransferMetadata;

ExternalVolume(
PresignedUrlExternalVolume(
String clientName,
String clientPrefix,
Long deploymentId,
Expand Down Expand Up @@ -123,12 +123,13 @@ class ExternalVolume implements IStorage {
@Override
public void put(BlobPath blobPath, byte[] blob) {
if (this.fileTransferMetadata.isLocalFS) {
InternalStage.putLocal(this.fileTransferMetadata.localLocation, blobPath.fileName, blob);
InternalStage.putLocal(
this.fileTransferMetadata.localLocation, blobPath.fileRegistrationPath, blob);
return;
}

try {
putRemote(blobPath.blobPath, blob);
putRemote(blobPath.uploadPath, blob);
} catch (Throwable e) {
throw new SFException(e, ErrorCode.BLOB_UPLOAD_FAILURE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
import net.snowflake.ingest.utils.SFException;

/** Class to manage multiple external volumes */
class ExternalVolumeManager implements IStorageManager {
class PresignedUrlExternalVolumeManager implements IStorageManager {
// TODO: Rename all logger members to LOGGER and checkin code formatting rules
private static final Logging logger = new Logging(ExternalVolumeManager.class);
private static final Logging logger = new Logging(PresignedUrlExternalVolumeManager.class);
// Reference to the external volume per table
private final Map<String, ExternalVolume> externalVolumeMap;
private final Map<String, PresignedUrlExternalVolume> externalVolumeMap;

// name of the owning client
private final String clientName;
Expand Down Expand Up @@ -48,7 +48,7 @@ class ExternalVolumeManager implements IStorageManager {
* @param clientName the name of the client
* @param snowflakeServiceClient the Snowflake service client used for configure calls
*/
ExternalVolumeManager(
PresignedUrlExternalVolumeManager(
boolean isTestMode,
String role,
String clientName,
Expand All @@ -66,7 +66,7 @@ class ExternalVolumeManager implements IStorageManager {
throw new SFException(e, ErrorCode.CLIENT_CONFIGURE_FAILURE, e.getMessage());
}
logger.logDebug(
"Created ExternalVolumeManager with clientName=%s and clientPrefix=%s",
"Created PresignedUrlExternalVolumeManager with clientName=%s and clientPrefix=%s",
clientName, clientPrefix);
}

Expand All @@ -77,7 +77,7 @@ class ExternalVolumeManager implements IStorageManager {
* @return target storage
*/
@Override
public ExternalVolume getStorage(String fullyQualifiedTableName) {
public PresignedUrlExternalVolume getStorage(String fullyQualifiedTableName) {
// Only one chunk per blob in Iceberg mode.
return getVolumeSafe(fullyQualifiedTableName);
}
Expand All @@ -104,8 +104,8 @@ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {
}

try {
ExternalVolume externalVolume =
new ExternalVolume(
PresignedUrlExternalVolume externalVolume =
new PresignedUrlExternalVolume(
clientName,
getClientPrefix(),
deploymentId,
Expand All @@ -132,9 +132,9 @@ public void registerTable(TableRef tableRef, FileLocationInfo locationInfo) {

@Override
public BlobPath generateBlobPath(String fullyQualifiedTableName) {
ExternalVolume volume = getVolumeSafe(fullyQualifiedTableName);
PresignedUrlExternalVolume volume = getVolumeSafe(fullyQualifiedTableName);
PresignedUrlInfo urlInfo = volume.dequeueUrlInfo();
return BlobPath.presignedUrlWithToken(urlInfo.fileName, urlInfo.url);
return new BlobPath(urlInfo.url /* uploadPath */, urlInfo.fileName /* fileRegistrationPath */);
}

/**
Expand All @@ -147,8 +147,8 @@ public String getClientPrefix() {
return this.clientPrefix;
}

private ExternalVolume getVolumeSafe(String fullyQualifiedTableName) {
ExternalVolume volume = this.externalVolumeMap.get(fullyQualifiedTableName);
private PresignedUrlExternalVolume getVolumeSafe(String fullyQualifiedTableName) {
PresignedUrlExternalVolume volume = this.externalVolumeMap.get(fullyQualifiedTableName);

if (volume == null) {
throw new SFException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea

this.storageManager =
isIcebergMode
? new ExternalVolumeManager(
? new PresignedUrlExternalVolumeManager(
isTestMode, this.role, this.name, this.snowflakeServiceClient)
: new InternalStageManager<T>(
isTestMode, this.role, this.name, this.snowflakeServiceClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ private abstract static class TestContext<T> implements AutoCloseable {
FlushService<T> flushService;
IStorageManager storageManager;
InternalStage storage;
ExternalVolume extVolume;
PresignedUrlExternalVolume extVolume;
ParameterProvider parameterProvider;
RegisterService registerService;

final List<ChannelData<T>> channelData = new ArrayList<>();

TestContext() {
storage = Mockito.mock(InternalStage.class);
extVolume = Mockito.mock(ExternalVolume.class);
extVolume = Mockito.mock(PresignedUrlExternalVolume.class);
parameterProvider = new ParameterProvider(isIcebergMode);
InternalParameterProvider internalParameterProvider =
new InternalParameterProvider(isIcebergMode);
Expand All @@ -118,7 +118,7 @@ private abstract static class TestContext<T> implements AutoCloseable {
storageManager =
Mockito.spy(
isIcebergMode
? new ExternalVolumeManager(
? new PresignedUrlExternalVolumeManager(
true, "role", "client", MockSnowflakeServiceClient.create())
: new InternalStageManager(true, "role", "client", null));
Mockito.doReturn(isIcebergMode ? extVolume : storage)
Expand Down Expand Up @@ -148,7 +148,7 @@ ChannelData<T> flushChannel(String name) {
BlobMetadata buildAndUpload() throws Exception {
List<List<ChannelData<T>>> blobData = Collections.singletonList(channelData);
return flushService.buildAndUpload(
BlobPath.fileNameWithoutToken("file_name"),
new BlobPath("file_name" /* uploadPath */, "file_name" /* fileRegistrationPath */),
blobData,
blobData.get(0).get(0).getChannelContext().getFullyQualifiedTableName());
}
Expand Down Expand Up @@ -966,7 +966,7 @@ public void testBuildAndUpload() throws Exception {
blobCaptor.capture(),
metadataCaptor.capture(),
ArgumentMatchers.any());
Assert.assertEquals("file_name", nameCaptor.getValue().fileName);
Assert.assertEquals("file_name", nameCaptor.getValue().fileRegistrationPath);

ChunkMetadata metadataResult = metadataCaptor.getValue().get(0);
List<ChannelMetadata> channelMetadataResult = metadataResult.getChannels();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ public void testPutRemote() throws Exception {
final ArgumentCaptor<SnowflakeFileTransferConfig> captor =
ArgumentCaptor.forClass(SnowflakeFileTransferConfig.class);

stage.put(BlobPath.fileNameWithoutToken("test/path"), dataBytes);
stage.put(
new BlobPath("test/path" /* uploadPath */, "test/path" /* fileRegistrationPath */),
dataBytes);
PowerMockito.verifyStatic(SnowflakeFileTransferAgent.class);
SnowflakeFileTransferAgent.uploadWithoutConnection(captor.capture());
SnowflakeFileTransferConfig capturedConfig = captor.getValue();
Expand Down Expand Up @@ -186,7 +188,8 @@ public void testPutLocal() throws Exception {
1));
Mockito.doReturn(true).when(stage).isLocalFS();

stage.put(BlobPath.fileNameWithoutToken(fileName), dataBytes);
stage.put(
new BlobPath(fileName /* uploadPath */, fileName /* fileRegistrationPath */), dataBytes);
Path outputPath = Paths.get(fullFilePath, fileName);
List<String> output = Files.readAllLines(outputPath);
Assert.assertEquals(1, output.size());
Expand Down Expand Up @@ -223,7 +226,9 @@ public void doTestPutRemoteRefreshes() throws Exception {
ArgumentCaptor.forClass(SnowflakeFileTransferConfig.class);

try {
stage.put(BlobPath.fileNameWithoutToken("test/path"), dataBytes);
stage.put(
new BlobPath("test/path" /* uploadPath */, "test/path" /* fileRegistrationPath */),
dataBytes);
Assert.fail("Should not succeed");
} catch (SFException ex) {
// Expected behavior given mocked response
Expand Down Expand Up @@ -272,7 +277,9 @@ public void testPutRemoteGCS() throws Exception {
SnowflakeFileTransferMetadataV1 metaMock = Mockito.mock(SnowflakeFileTransferMetadataV1.class);

Mockito.doReturn(metaMock).when(stage).fetchSignedURL(Mockito.any());
stage.put(BlobPath.fileNameWithoutToken("test/path"), dataBytes);
stage.put(
new BlobPath("test/path" /* uploadPath */, "test/path" /* fileRegistrationPath */),
dataBytes);
SnowflakeFileTransferAgent.uploadWithoutConnection(Mockito.any());
Mockito.verify(stage, times(1)).fetchSignedURL("test/path");
}
Expand Down Expand Up @@ -593,7 +600,9 @@ public Object answer(org.mockito.invocation.InvocationOnMock invocation)
final ArgumentCaptor<SnowflakeFileTransferConfig> captor =
ArgumentCaptor.forClass(SnowflakeFileTransferConfig.class);

stage.put(BlobPath.fileNameWithoutToken("test/path"), dataBytes);
stage.put(
new BlobPath("test/path" /* uploadPath */, "test/path" /* fileRegistrationPath */),
dataBytes);

PowerMockito.verifyStatic(SnowflakeFileTransferAgent.class, times(maxUploadRetryCount));
SnowflakeFileTransferAgent.uploadWithoutConnection(captor.capture());
Expand Down
Loading

0 comments on commit 513b589

Please sign in to comment.