-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1708577 Parquet V2 support for new table format #851
Conversation
src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java
Outdated
Show resolved
Hide resolved
|
||
if (internalParameterProvider.getEnableChunkEncryption()) { | ||
Pair<byte[], Integer> paddedChunk = | ||
padChunk(serializedChunk.chunkData, Constants.ENCRYPTION_ALGORITHM_BLOCK_SIZE_BYTES); | ||
byte[] paddedChunkData = paddedChunk.getFirst(); | ||
chunkLength = paddedChunk.getSecond(); | ||
|
||
if (internalParameterProvider.getComputeExtendedMetadataSize()) { | ||
extendedMetadataSize = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is risky in that (a) the behavior of paddedChunkData can change and break the assumption here of no-padding-at-the-tail, (b) no guaranteed way of avoiding a corrupt value from flowing out.
- lets extract extended metadata size before padding.
- verify that the last four bytes are == ParquetFileWriter.MAGIC
- Is there nothing on ParquetFileWriter that exposes the extendedMetadataSize?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- The
chunkLength
returned bypadChunk
is the length before padding, the offset should be correct. - Added check in
Utils.getExtendedMetadataSize
. - Afaik the parquet file writer only log this value (ref)
src/main/java/net/snowflake/ingest/streaming/internal/BlobBuilder.java
Outdated
Show resolved
Hide resolved
@@ -74,6 +74,9 @@ public class Constants { | |||
public static final int PARQUET_MAJOR_VERSION = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this hardcoded version and read the actual version from one of the parquet writers / parquet config classes ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use major version from ParquetFileWirter.CURRENT_VERSION
. Keep minor version as I cannot find one in ParquetFileWriter
src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/utils/ParameterProvider.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/InternalParameterProvider.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/EpInfo.java
Outdated
Show resolved
Hide resolved
for (Map.Entry<String, RowBufferStats> colStat : colStats.entrySet()) { | ||
RowBufferStats stat = colStat.getValue(); | ||
enableDistinctValues = stat.isEnableDistinctValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks odd to me. We are inferring an outer-class characteristic from an inner-object's state.
Lets take in a boolean alongside setDefaultValues
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
infact you can read this.clientBufferParameters.enableDistinctValues and don't even need an argument here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved ndv flag to parameter. Cannot use this.clientBufferParameters
as this is a static method called by chained of static methods.
src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/ParquetRowBuffer.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java
Outdated
Show resolved
Hide resolved
src/main/java/net/snowflake/ingest/streaming/internal/RowBufferStats.java
Outdated
Show resolved
Hide resolved
@@ -131,6 +161,9 @@ void addStrValue(String value) { | |||
|
|||
void addBinaryValue(byte[] valueBytes) { | |||
this.setCurrentMaxLength(valueBytes.length); | |||
if (enableDistinctValue) { | |||
distinctValues.add(new String(Base64.getEncoder().encode(valueBytes))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ouch this will kill performance - both cpu and memory. If I'm not wrong we are already doing heavy validation of input strings, I'm hoping we can reuse that single pass on the whole string.
Another idea is to keep a Map<String, Set<Int>>
here for string columns, the map key can be a CRC32 hash and the map value will be a list of row ids that contain a value with that hash. Only in the case where the Set has size > 1 do we look at the actual strings, before serialization. This is non-trivial amount of changes all over though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. Lets keep NDV disabled by default. When it's enabled, skip the string ndv computation. Added a JIRA for this.
2875a66
to
bb791d5
Compare
switch (this) { | ||
case NON_ICEBERG: | ||
case COMPATIBLE: | ||
return ParquetProperties.WriterVersion.PARQUET_1_0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that non-iceberg tables (which are snowflake managed tables AFAIK) only support parquet v1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The server-side scanner for FDN tables supports Parquet V2. This PR is specific to the Iceberg table feature and does not alter the default behavior for streaming to FDN tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
bb791d5
to
a41ed84
Compare
a41ed84
to
c06435e
Compare
c06435e
to
31386d5
Compare
31386d5
to
7fa0e3a
Compare
1. Remove logic from OpenChannelResponse contract class 2. Move writerVersion defaulting to channel construction callsite in clientInternal (from channel ctor), instead of passing writerVersion=null into the channel. 3. Pass around writerVersion via RowBuffer into Flusher, instead of via the per-chunk flushContext. 4. Remove a test-only overload of ChannelInternal 5. Remove an unnecessary parameter on ChannelInternal ctor (bdecVersion) 6. Remove SerializationPolicy.NON_ICEBERG, remove the custom SerPolicy.fromName method and use Enum.valueOf that java already has
@@ -140,7 +141,7 @@ static <T> Blob constructBlobAndMetadata( | |||
|
|||
if (internalParameterProvider.setIcebergSpecificFieldsInEp()) { | |||
chunkMetadataBuilder | |||
.setMajorVersion(Constants.PARQUET_MAJOR_VERSION) | |||
.setMajorVersion(ParquetFileWriter.CURRENT_VERSION) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd much rather depend on our own constant than a third-party library's constant, I thought I had left a comment on this but don't see it anywhere :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok to take in next PR too, just remove the import whenever you revert this.
@@ -124,4 +124,8 @@ public Optional<Integer> getMaxRowGroups() { | |||
public String getParquetMessageTypeName() { | |||
return isIcebergMode ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME; | |||
} | |||
|
|||
public boolean isEnableDictionaryEncoding() { | |||
return isIcebergMode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this might be dependent on storage serialization policy too, lets verify. no need to hold up PR.
@@ -43,6 +43,10 @@ public class BdecParquetWriter implements AutoCloseable { | |||
// Optional cap on the max number of row groups to allow per file, if this is exceeded we'll end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: lets rename this file to SnowflakeParquetWriter instead of BdecParquetWriter?!
databaseName = String.format("SDK_DATATYPE_COMPATIBILITY_IT_%s", getRandomIdentifier()); | ||
conn = TestUtils.getConnection(true); | ||
conn.createStatement().execute(String.format("create or replace database %s;", databaseName)); | ||
conn.createStatement().execute(String.format("use database %s;", databaseName)); | ||
conn.createStatement().execute(String.format("use schema %s;", schemaName)); | ||
|
||
switch (serializationPolicy) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if (iceberg) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, left some minor nits/etc that can be taken with the next PR you have in flight. Lets merge this in to unblock that PR and other PRs that need to go in.
This PR aims to ensure consistency with the Iceberg table scanner and registration on the server side. It includes the following changes:
STORAGE_SERIALIZATION_POLICY
is set toOPTIMIZED.