Skip to content

Commit

Permalink
Re-merge "Add chunk offset to file id key to make each chunk have a u…
Browse files Browse the repository at this point in the history
…nique key" from pull 825 (#865)

* Revert "Revert "Add chunk offset to file id key to make each chunk have a uni… (#848)"

This reverts commit 0930648.

* Comments from Hitesh

* Fix test to handle iceberg case
  • Loading branch information
sfc-gh-psaha authored Oct 31, 2024
1 parent e5c3316 commit 8cbf2da
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ static <T> Blob constructBlobAndMetadata(

Flusher<T> flusher = channelsDataPerTable.get(0).createFlusher();
Flusher.SerializationResult serializedChunk =
flusher.serialize(channelsDataPerTable, filePath);
flusher.serialize(channelsDataPerTable, filePath, curDataSize);

if (!serializedChunk.channelsMetadataList.isEmpty()) {
final byte[] compressedChunkData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ public interface Flusher<T> {
/**
* Serialize buffered rows into the underlying format.
*
* @param fullyQualifiedTableName
* @param channelsDataPerTable buffered rows
* @param filePath file path
* @param chunkStartOffset
* @return {@link SerializationResult}
* @throws IOException
*/
SerializationResult serialize(List<ChannelData<T>> channelsDataPerTable, String filePath)
SerializationResult serialize(
List<ChannelData<T>> channelsDataPerTable, String filePath, long chunkStartOffset)
throws IOException;

/** Holds result of the buffered rows conversion: channel metadata and stats. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.Preconditions;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.SnowflakeParquetWriter;
import org.apache.parquet.schema.MessageType;
Expand Down Expand Up @@ -55,13 +56,17 @@ public ParquetFlusher(

@Override
public SerializationResult serialize(
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath)
List<ChannelData<ParquetChunkData>> channelsDataPerTable,
String filePath,
long chunkStartOffset)
throws IOException {
return serializeFromJavaObjects(channelsDataPerTable, filePath);
return serializeFromJavaObjects(channelsDataPerTable, filePath, chunkStartOffset);
}

private SerializationResult serializeFromJavaObjects(
List<ChannelData<ParquetChunkData>> channelsDataPerTable, String filePath)
List<ChannelData<ParquetChunkData>> channelsDataPerTable,
String filePath,
long chunkStartOffset)
throws IOException {
List<ChannelMetadata> channelsMetadataList = new ArrayList<>();
long rowCount = 0L;
Expand Down Expand Up @@ -127,15 +132,7 @@ private SerializationResult serializeFromJavaObjects(
}

Map<String, String> metadata = channelsDataPerTable.get(0).getVectors().metadata;
// We insert the filename in the file itself as metadata so that streams can work on replicated
// tables. For a more detailed discussion on the topic see SNOW-561447,
// http://go/streams-on-replicated-mixed-tables, and
// http://go/managed-iceberg-replication-change-tracking
metadata.put(
enableIcebergStreaming
? Constants.ASSIGNED_FULL_FILE_NAME_KEY
: Constants.PRIMARY_FILE_ID_KEY,
StreamingIngestUtils.getShortname(filePath));
addFileIdToMetadata(filePath, chunkStartOffset, metadata);
parquetWriter =
new SnowflakeParquetWriter(
mergedData,
Expand All @@ -162,6 +159,33 @@ private SerializationResult serializeFromJavaObjects(
parquetWriter.getExtendedMetadataSize());
}

private void addFileIdToMetadata(
String filePath, long chunkStartOffset, Map<String, String> metadata) {
// We insert the filename in the file itself as metadata so that streams can work on replicated
// mixed tables. For a more detailed discussion on the topic see SNOW-561447 and
// http://go/streams-on-replicated-mixed-tables, and
// http://go/managed-iceberg-replication-change-tracking
// Using chunk offset as suffix ensures that for interleaved tables, the file
// id key is unique for each chunk. Each chunk is logically a separate Parquet file that happens
// to be bundled together.
if (chunkStartOffset == 0) {
metadata.put(
enableIcebergStreaming
? Constants.ASSIGNED_FULL_FILE_NAME_KEY
: Constants.PRIMARY_FILE_ID_KEY,
StreamingIngestUtils.getShortname(filePath));
} else {
Preconditions.checkState(
!enableIcebergStreaming, "Iceberg streaming is not supported with non-zero offsets");
String shortName = StreamingIngestUtils.getShortname(filePath);
final String[] parts = shortName.split("\\.");
Preconditions.checkState(parts.length == 2, "Invalid file name format");
metadata.put(
Constants.PRIMARY_FILE_ID_KEY,
String.format("%s_%d.%s", parts[0], chunkStartOffset, parts[1]));
}
}

/**
* Validates that rows count in metadata matches the row count in Parquet footer and the row count
* written by the parquet writer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import java.util.Set;
import java.util.function.Consumer;
import net.snowflake.client.jdbc.internal.google.common.collect.Sets;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.connection.TelemetryService;
import net.snowflake.ingest.streaming.OffsetTokenVerificationFunction;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ErrorCode;
import net.snowflake.ingest.utils.IcebergDataTypeParser;
import net.snowflake.ingest.utils.SFException;
Expand Down Expand Up @@ -89,6 +91,7 @@ public void setupSchema(List<ColumnMetadata> columns) {
if (!clientBufferParameters.isEnableIcebergStreaming()) {
metadata.put("sfVer", "1,1");
}
metadata.put(Constants.SDK_VERSION_KEY, RequestBuilder.DEFAULT_VERSION);
List<Type> parquetTypes = new ArrayList<>();
int id = 1;

Expand Down
1 change: 1 addition & 0 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class Constants {
public static final String PRIMARY_FILE_ID_KEY =
"primaryFileId"; // Don't change, should match Parquet Scanner
public static final String ASSIGNED_FULL_FILE_NAME_KEY = "assignedFullFileName";
public static final String SDK_VERSION_KEY = "sdkVersion";
public static final long RESPONSE_SUCCESS = 0L; // Don't change, should match server side
public static final long RESPONSE_ERR_GENERAL_EXCEPTION_RETRY_REQUEST =
10L; // Don't change, should match server side
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,9 @@ public void testRefreshSnowflakeMetadataSynchronized() throws Exception {
throw new RuntimeException(e);
}
});
workers.shutdown();

workers.awaitTermination(150, TimeUnit.MILLISECONDS);
Assert.assertTrue(workers.awaitTermination(1, TimeUnit.SECONDS));

Mockito.verify(mockClient).execute(Mockito.any());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import net.snowflake.ingest.connection.RequestBuilder;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.utils.Constants;
Expand Down Expand Up @@ -2028,18 +2029,50 @@ public void testParquetFileNameMetadata() throws IOException {
data.setChannelContext(new ChannelFlushContext("name", "db", "schema", "table", 1L, "key", 0L));

ParquetFlusher flusher = (ParquetFlusher) bufferUnderTest.createFlusher();
Flusher.SerializationResult result =
flusher.serialize(Collections.singletonList(data), filePath);
{
Flusher.SerializationResult result =
flusher.serialize(Collections.singletonList(data), filePath, 0);

BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray());
Assert.assertEquals(
filePath,
reader
.getKeyValueMetadata()
.get(
enableIcebergStreaming
? Constants.ASSIGNED_FULL_FILE_NAME_KEY
: Constants.PRIMARY_FILE_ID_KEY));
BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray());
Assert.assertEquals(
"testParquetFileNameMetadata.bdec",
reader
.getKeyValueMetadata()
.get(
enableIcebergStreaming
? Constants.ASSIGNED_FULL_FILE_NAME_KEY
: Constants.PRIMARY_FILE_ID_KEY));
Assert.assertEquals(
RequestBuilder.DEFAULT_VERSION,
reader.getKeyValueMetadata().get(Constants.SDK_VERSION_KEY));
}
{
try {
Flusher.SerializationResult result =
flusher.serialize(Collections.singletonList(data), filePath, 13);
if (enableIcebergStreaming) {
Assert.fail(
"Should have thrown an exception because iceberg streams do not support offsets");
}

BdecParquetReader reader = new BdecParquetReader(result.chunkData.toByteArray());
Assert.assertEquals(
"testParquetFileNameMetadata_13.bdec",
reader
.getKeyValueMetadata()
.get(
enableIcebergStreaming
? Constants.ASSIGNED_FULL_FILE_NAME_KEY
: Constants.PRIMARY_FILE_ID_KEY));
Assert.assertEquals(
RequestBuilder.DEFAULT_VERSION,
reader.getKeyValueMetadata().get(Constants.SDK_VERSION_KEY));
} catch (IllegalStateException ex) {
if (!enableIcebergStreaming) {
throw ex;
}
}
}
}

@Test
Expand Down

0 comments on commit 8cbf2da

Please sign in to comment.