Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Jul 15, 2024
1 parent c1d7116 commit 28dea75
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public void refreshToken() throws IOException {

/** Helper method for making refresh request */
private HttpUriRequest makeRefreshTokenRequest() {
// TODO SNOW-1538108 Use SnowflakeServiceClient to make the request
HttpPost post = new HttpPost(oAuthCredential.get().getOAuthTokenEndpoint());
post.addHeader(HttpHeaders.CONTENT_TYPE, OAUTH_CONTENT_TYPE_HEADER);
post.addHeader(HttpHeaders.AUTHORIZATION, oAuthCredential.get().getAuthHeader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public enum OnErrorOption {
private final ZoneId defaultTimezone;

private final String offsetToken;
private final boolean isOffsetTokenProvided;

private final OffsetTokenVerificationFunction offsetTokenVerificationFunction;

Expand All @@ -58,6 +59,7 @@ public static class OpenChannelRequestBuilder {
private ZoneId defaultTimezone;

private String offsetToken;
private boolean isOffsetTokenProvided = false;

private OffsetTokenVerificationFunction offsetTokenVerificationFunction;

Expand Down Expand Up @@ -93,6 +95,7 @@ public OpenChannelRequestBuilder setDefaultTimezone(ZoneId defaultTimezone) {

public OpenChannelRequestBuilder setOffsetToken(String offsetToken) {
this.offsetToken = offsetToken;
this.isOffsetTokenProvided = true;
return this;
}

Expand Down Expand Up @@ -122,6 +125,7 @@ private OpenChannelRequest(OpenChannelRequestBuilder builder) {
this.onErrorOption = builder.onErrorOption;
this.defaultTimezone = builder.defaultTimezone;
this.offsetToken = builder.offsetToken;
this.isOffsetTokenProvided = builder.isOffsetTokenProvided;
this.offsetTokenVerificationFunction = builder.offsetTokenVerificationFunction;
}

Expand Down Expand Up @@ -157,6 +161,10 @@ public String getOffsetToken() {
return this.offsetToken;
}

public boolean isOffsetTokenProvided() {
return this.isOffsetTokenProvided;
}

public OffsetTokenVerificationFunction getOffsetTokenVerificationFunction() {
return this.offsetTokenVerificationFunction;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,42 @@

package net.snowflake.ingest.streaming.internal;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

/** Class used to serialize client configure request */
class ClientConfigureRequest extends ConfigureRequest {
class ClientConfigureRequest implements StreamingIngestRequest {
/**
* Constructor for client configure request
*
* @param role Role to be used for the request.
*/
ClientConfigureRequest(String role) {
setRole(role);
this.role = role;
}

@JsonProperty("role")
private String role;

// File name for the GCS signed url request
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonProperty("file_name")
private String fileName;

String getRole() {
return role;
}

void setRole(String role) {
this.role = role;
}

String getFileName() {
return fileName;
}

void setFileName(String fileName) {
this.fileName = fileName;
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ class OpenChannelResponse extends StreamingIngestResponse {
private List<ColumnMetadata> tableColumns;
private String encryptionKey;
private Long encryptionKeyId;
private FileLocationInfo stageLocation;

@JsonProperty("status_code")
void setStatusCode(Long statusCode) {
Expand Down Expand Up @@ -131,13 +130,4 @@ void setEncryptionKeyId(Long encryptionKeyId) {
Long getEncryptionKeyId() {
return this.encryptionKeyId;
}

@JsonProperty("stage_location")
void setStageLocation(FileLocationInfo stageLocation) {
this.stageLocation = stageLocation;
}

FileLocationInfo getStageLocation() {
return this.stageLocation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ DropChannelResponse dropChannel(DropChannelRequestInternal request)
* @param request the channel status request
* @return the response from the channel status request
*/
ChannelsStatusResponse channelStatus(ChannelsStatusRequest request)
ChannelsStatusResponse getChannelStatus(ChannelsStatusRequest request)
throws IngestResponseException, IOException {
ChannelsStatusResponse response =
executeApiRequestWithRetries(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.codahale.metrics.jmx.JmxReporter;
import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -85,9 +84,6 @@ public class SnowflakeStreamingIngestClientInternal<T> implements SnowflakeStrea

private static final Logging logger = new Logging(SnowflakeStreamingIngestClientInternal.class);

// Object mapper for all marshalling and unmarshalling
private static final ObjectMapper objectMapper = new ObjectMapper();

// Counter to generate unique request ids per client
private final AtomicLong counter = new AtomicLong(0);

Expand Down Expand Up @@ -368,13 +364,6 @@ public SnowflakeStreamingIngestChannelInternal<?> openChannel(OpenChannelRequest
// Add channel to the channel cache
this.channelCache.addChannel(channel);

// Add storage to the storage manager, only for external volume
this.storageManager.addStorage(
response.getDBName(),
response.getSchemaName(),
response.getTableName(),
response.getStageLocation());

return channel;
} catch (IOException | IngestResponseException e) {
throw new SFException(e, ErrorCode.OPEN_CHANNEL_FAILURE, e.getMessage());
Expand Down Expand Up @@ -460,7 +449,7 @@ ChannelsStatusResponse getChannelsStatus(
request.setChannels(requestDTOs);
request.setRole(this.role);

ChannelsStatusResponse response = snowflakeServiceClient.channelStatus(request);
ChannelsStatusResponse response = snowflakeServiceClient.getChannelStatus(request);

for (int idx = 0; idx < channels.size(); idx++) {
SnowflakeStreamingIngestChannelInternal<?> channel = channels.get(idx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,13 @@
class StreamingIngestStorage<T, TLocation> {
private static final ObjectMapper mapper = new ObjectMapper();

// Object mapper for parsing the client/configure response to Jackson version the same as
// jdbc.internal.fasterxml.jackson
/**
* Object mapper for parsing the client/configure response to Jackson version the same as
* jdbc.internal.fasterxml.jackson. We need two different versions of ObjectMapper because {@link
* SnowflakeFileTransferAgent#getFileTransferMetadatas(net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode)}
* expects a different version of json object than {@link StreamingIngestResponse}. TODO:
* SNOW-1493470 Align Jackson version
*/
private static final net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper
parseConfigureResponseMapper =
new net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper();
Expand Down

0 comments on commit 28dea75

Please sign in to comment.