Skip to content

Commit

Permalink
Set createdon in BlobMetadata and is_iceberg in Drop Channel request (#…
Browse files Browse the repository at this point in the history
…845)

- Set createdOn on the blob Metadata, which can be used in the service to populate the FileRegistrationRequest
- Set is_iceberg in /channels/drop API payload since it gets validated the same way as open channel (where we do have the is_iceberg API available)
  • Loading branch information
sfc-gh-hmadan authored Sep 30, 2024
1 parent 7b3881b commit 27cd3a7
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ static <T> Blob constructBlobAndMetadata(
chunkMetadataBuilder
.setMajorVersion(Constants.PARQUET_MAJOR_VERSION)
.setMinorVersion(Constants.PARQUET_MINOR_VERSION)
.setCreatedOn(0L)
// set createdOn in seconds
.setCreatedOn(System.currentTimeMillis() / 1000)
.setExtendedMetadataSize(-1L);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,25 @@ class DropChannelRequestInternal implements IStreamingIngestRequest {
@JsonProperty("client_sequencer")
Long clientSequencer;

@JsonProperty("is_iceberg")
boolean isIceberg;

DropChannelRequestInternal(
String requestId,
String role,
String database,
String schema,
String table,
String channel,
boolean isIceberg,
Long clientSequencer) {
this.requestId = requestId;
this.role = role;
this.database = database;
this.schema = schema;
this.table = table;
this.channel = channel;
this.isIceberg = isIceberg;
this.clientSequencer = clientSequencer;
}

Expand Down Expand Up @@ -74,6 +79,10 @@ String getSchema() {
return schema;
}

boolean isIceberg() {
return isIceberg;
}

Long getClientSequencer() {
return clientSequencer;
}
Expand All @@ -86,7 +95,7 @@ String getFullyQualifiedTableName() {
public String getStringForLogging() {
return String.format(
"DropChannelRequest(requestId=%s, role=%s, db=%s, schema=%s, table=%s, channel=%s,"
+ " clientSequencer=%s)",
requestId, role, database, schema, table, channel, clientSequencer);
+ " isIceberg=%s, clientSequencer=%s)",
requestId, role, database, schema, table, channel, isIceberg, clientSequencer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ public void dropChannel(DropChannelRequest request) {
request.getSchemaName(),
request.getTableName(),
request.getChannelName(),
this.isIcebergMode,
request instanceof DropChannelVersionRequest
? ((DropChannelVersionRequest) request).getClientSequencer()
: null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,18 @@
import net.snowflake.ingest.utils.Constants;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class SnowflakeServiceClientTest {
@Parameterized.Parameters(name = "isIceberg: {0}")
public static Object[] isIceberg() {
return new Object[] {false, true};
}

@Parameterized.Parameter public boolean isIceberg;

private SnowflakeServiceClient snowflakeServiceClient;

@Before
Expand Down Expand Up @@ -50,7 +60,7 @@ public void testOpenChannel() throws IngestResponseException, IOException {
"test_table",
"test_channel",
Constants.WriteMode.CLOUD_STORAGE,
false,
isIceberg,
"test_offset_token");
OpenChannelResponse openChannelResponse =
snowflakeServiceClient.openChannel(openChannelRequest);
Expand All @@ -72,7 +82,14 @@ public void testOpenChannel() throws IngestResponseException, IOException {
public void testDropChannel() throws IngestResponseException, IOException {
DropChannelRequestInternal dropChannelRequest =
new DropChannelRequestInternal(
"request_id", "test_role", "test_db", "test_schema", "test_table", "test_channel", 0L);
"request_id",
"test_role",
"test_db",
"test_schema",
"test_table",
"test_channel",
isIceberg,
0L);
DropChannelResponse dropChannelResponse =
snowflakeServiceClient.dropChannel(dropChannelRequest);
assert dropChannelResponse.getStatusCode() == 0L;
Expand Down

0 comments on commit 27cd3a7

Please sign in to comment.