Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Oct 8, 2024
1 parent 3ef63c0 commit 31386d5
Show file tree
Hide file tree
Showing 26 changed files with 390 additions and 253 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -641,21 +641,23 @@ public synchronized void close(String name) {
*
* @param rowCount: count of rows in the given buffer
* @param colStats: map of column name to RowBufferStats
* @param setDefaultValues: whether to set default values for null fields the EPs
* @param setDefaultValues: whether to set default values for null min/max field in the EPs
* @param enableDistinctValuesCount: whether to include valid NDV in the EPs
* @return the EPs built from column stats
*/
static EpInfo buildEpInfoFromStats(
long rowCount, Map<String, RowBufferStats> colStats, boolean setDefaultValues) {
EpInfo epInfo = new EpInfo(rowCount, new HashMap<>());
boolean enableDistinctValues = false;
long rowCount,
Map<String, RowBufferStats> colStats,
boolean setDefaultValues,
boolean enableDistinctValuesCount) {
EpInfo epInfo = new EpInfo(rowCount, new HashMap<>(), enableDistinctValuesCount);
for (Map.Entry<String, RowBufferStats> colStat : colStats.entrySet()) {
RowBufferStats stat = colStat.getValue();
enableDistinctValues = stat.isEnableDistinctValue();
FileColumnProperties dto = new FileColumnProperties(stat, setDefaultValues);
String colName = colStat.getValue().getColumnDisplayName();
epInfo.getColumnEps().put(colName, dto);
}
epInfo.verifyEpInfo(enableDistinctValues);
epInfo.verifyEpInfo();
return epInfo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import net.snowflake.ingest.utils.Cryptor;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.ParameterProvider;
import net.snowflake.ingest.utils.Utils;
import org.apache.commons.codec.binary.Hex;
import org.apache.parquet.hadoop.ParquetFileWriter;

/**
* Build a single blob file that contains file header plus data. The header will be a
Expand Down Expand Up @@ -68,6 +70,7 @@ static <T> Blob constructBlobAndMetadata(
String filePath,
List<List<ChannelData<T>>> blobData,
Constants.BdecVersion bdecVersion,
ParameterProvider parameterProvider,
InternalParameterProvider internalParameterProvider)
throws IOException, NoSuchPaddingException, NoSuchAlgorithmException,
InvalidAlgorithmParameterException, InvalidKeyException, IllegalBlockSizeException,
Expand Down Expand Up @@ -98,13 +101,6 @@ static <T> Blob constructBlobAndMetadata(
byte[] paddedChunkData = paddedChunk.getFirst();
chunkLength = paddedChunk.getSecond();

if (internalParameterProvider.getComputeExtendedMetadataSize()) {
extendedMetadataSize =
Utils.getLittleEndianInt(
paddedChunkData,
chunkLength - Constants.PARQUET_MAGIC_BYTES_LENGTH - Integer.BYTES);
}

// Encrypt the compressed chunk data, the encryption key is derived using the key from
// server with the full blob path.
// We need to maintain IV as a block counter for the whole file, even interleaved,
Expand All @@ -120,11 +116,8 @@ static <T> Blob constructBlobAndMetadata(
chunkLength = compressedChunkData.length;
compressedChunkDataSize = chunkLength;

if (internalParameterProvider.getComputeExtendedMetadataSize()) {
extendedMetadataSize =
Utils.getLittleEndianInt(
compressedChunkData,
chunkLength - Constants.PARQUET_MAGIC_BYTES_LENGTH - Integer.BYTES);
if (internalParameterProvider.setIcebergSpecificFieldsInEp()) {
extendedMetadataSize = Utils.getExtendedMetadataSize(compressedChunkData, chunkLength);
}
}

Expand All @@ -150,13 +143,14 @@ static <T> Blob constructBlobAndMetadata(
AbstractRowBuffer.buildEpInfoFromStats(
serializedChunk.rowCount,
serializedChunk.columnEpStatsMapCombined,
internalParameterProvider.setDefaultValuesInEp()))
internalParameterProvider.setDefaultValuesInEp(),
parameterProvider.isEnableDistinctValuesCount()))
.setFirstInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getFirst())
.setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond());

if (internalParameterProvider.setIcebergSpecificFieldsInEp()) {
chunkMetadataBuilder
.setMajorVersion(Constants.PARQUET_MAJOR_VERSION)
.setMajorVersion(ParquetFileWriter.CURRENT_VERSION)
.setMinorVersion(Constants.PARQUET_MINOR_VERSION)
// set createdOn in seconds
.setCreatedOn(System.currentTimeMillis() / 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package net.snowflake.ingest.streaming.internal;

import net.snowflake.ingest.utils.Utils;
import org.apache.parquet.column.ParquetProperties;

/**
* Channel immutable identification and encryption attributes.
Expand All @@ -29,14 +30,17 @@ class ChannelFlushContext {
// Data encryption key id
private final Long encryptionKeyId;

private final ParquetProperties.WriterVersion parquetWriterVersion;

ChannelFlushContext(
String name,
String dbName,
String schemaName,
String tableName,
Long channelSequencer,
String encryptionKey,
Long encryptionKeyId) {
Long encryptionKeyId,
ParquetProperties.WriterVersion parquetWriterVersion) {
this.name = name;
this.fullyQualifiedName =
Utils.getFullyQualifiedChannelName(dbName, schemaName, tableName, name);
Expand All @@ -47,6 +51,7 @@ class ChannelFlushContext {
this.channelSequencer = channelSequencer;
this.encryptionKey = encryptionKey;
this.encryptionKeyId = encryptionKeyId;
this.parquetWriterVersion = parquetWriterVersion;
}

@Override
Expand Down Expand Up @@ -115,4 +120,8 @@ String getEncryptionKey() {
Long getEncryptionKeyId() {
return encryptionKeyId;
}

ParquetProperties.WriterVersion getParquetWriterVersion() {
return parquetWriterVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.Optional;
import net.snowflake.ingest.utils.Constants;
import net.snowflake.ingest.utils.ParameterProvider;
import org.apache.parquet.column.ParquetProperties;

/** Channel's buffer relevant parameters that are set at the owning client level. */
public class ClientBufferParameters {
Expand All @@ -24,12 +23,10 @@ public class ClientBufferParameters {

private final Optional<Integer> maxRowGroups;

private ParquetProperties.WriterVersion parquetWriterVersion;

private boolean enableDictionaryEncoding;

private boolean isIcebergMode;

private boolean enableDistinctValuesCount;

/**
* Private constructor used for test methods
*
Expand All @@ -43,17 +40,15 @@ private ClientBufferParameters(
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic,
Optional<Integer> maxRowGroups,
ParquetProperties.WriterVersion parquetWriterVersion,
boolean enableDictionaryEncoding,
boolean isIcebergMode) {
boolean isIcebergMode,
boolean enableDistinctValuesCount) {
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
this.enableNewJsonParsingLogic = enableNewJsonParsingLogic;
this.maxRowGroups = maxRowGroups;
this.parquetWriterVersion = parquetWriterVersion;
this.enableDictionaryEncoding = enableDictionaryEncoding;
this.isIcebergMode = isIcebergMode;
this.enableDistinctValuesCount = enableDistinctValuesCount;
}

/** @param clientInternal reference to the client object where the relevant parameters are set */
Expand All @@ -74,14 +69,6 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
clientInternal != null
? clientInternal.getParameterProvider().isEnableNewJsonParsingLogic()
: ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT;
this.parquetWriterVersion =
clientInternal != null
? clientInternal.getParameterProvider().getParquetWriterVersion()
: ParameterProvider.PARQUET_WRITER_VERSION_DEFAULT;
this.enableDictionaryEncoding =
clientInternal != null
? clientInternal.getParameterProvider().isEnableParquetDictionaryEncoding()
: ParameterProvider.ENABLE_PARQUET_DICTIONARY_ENCODING_DEFAULT;
this.maxRowGroups =
isIcebergMode
? Optional.of(InternalParameterProvider.MAX_ROW_GROUP_COUNT_ICEBERG_MODE_DEFAULT)
Expand All @@ -90,6 +77,10 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
clientInternal != null
? clientInternal.isIcebergMode()
: ParameterProvider.IS_ICEBERG_MODE_DEFAULT;
this.enableDistinctValuesCount =
clientInternal != null
? clientInternal.getParameterProvider().isEnableDistinctValuesCount()
: ParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT;
}

/**
Expand All @@ -104,18 +95,16 @@ public static ClientBufferParameters test_createClientBufferParameters(
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic,
Optional<Integer> maxRowGroups,
ParquetProperties.WriterVersion parquetWriterVersion,
boolean enableDictionaryEncoding,
boolean isIcebergMode) {
boolean isIcebergMode,
boolean enableDistinctValuesCount) {
return new ClientBufferParameters(
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
bdecParquetCompression,
enableNewJsonParsingLogic,
maxRowGroups,
parquetWriterVersion,
enableDictionaryEncoding,
isIcebergMode);
isIcebergMode,
enableDistinctValuesCount);
}

public long getMaxChunkSizeInBytes() {
Expand Down Expand Up @@ -146,11 +135,7 @@ public String getParquetMessageTypeName() {
return isIcebergMode ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME;
}

public ParquetProperties.WriterVersion getParquetWriterVersion() {
return parquetWriterVersion;
}

public boolean isEnableDictionaryEncoding() {
return enableDictionaryEncoding;
public boolean isEnableDistinctValuesCount() {
return enableDistinctValuesCount;
}
}
14 changes: 10 additions & 4 deletions src/main/java/net/snowflake/ingest/streaming/internal/EpInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,22 @@ class EpInfo {

private Map<String, FileColumnProperties> columnEps;

private boolean enableDistinctValuesCount;

/** Default constructor, needed for Jackson */
EpInfo() {}

EpInfo(long rowCount, Map<String, FileColumnProperties> columnEps) {
EpInfo(
long rowCount,
Map<String, FileColumnProperties> columnEps,
boolean enableDistinctValuesCount) {
this.rowCount = rowCount;
this.columnEps = columnEps;
this.enableDistinctValuesCount = enableDistinctValuesCount;
}

/** Some basic verification logic to make sure the EP info is correct */
public void verifyEpInfo(boolean enableDistinctValues) {
public void verifyEpInfo() {
for (Map.Entry<String, FileColumnProperties> entry : columnEps.entrySet()) {
String colName = entry.getKey();
FileColumnProperties colEp = entry.getValue();
Expand All @@ -35,8 +41,8 @@ public void verifyEpInfo(boolean enableDistinctValues) {
colName, colEp.getNullCount(), rowCount));
}

// Make sure the NDV should always be -1 when the NDV is not enabled
if (!enableDistinctValues && colEp.getDistinctValues() != EP_NDV_UNKNOWN) {
// Make sure the NDV should always be -1 when the NDV set to default
if (!enableDistinctValuesCount && colEp.getDistinctValues() != EP_NDV_UNKNOWN) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,11 +597,14 @@ BlobMetadata buildAndUpload(
InvalidKeyException {
Timer.Context buildContext = Utils.createTimerContext(this.owningClient.buildLatency);

InternalParameterProvider paramProvider = this.owningClient.getInternalParameterProvider();
// Construct the blob along with the metadata of the blob
BlobBuilder.Blob blob =
BlobBuilder.constructBlobAndMetadata(
blobPath.fileName, blobData, bdecVersion, paramProvider);
blobPath.fileName,
blobData,
bdecVersion,
this.owningClient.getParameterProvider(),
this.owningClient.getInternalParameterProvider());

blob.blobStats.setBuildDurationMs(buildContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,4 @@ boolean setIcebergSpecificFieldsInEp() {
// in the EP metadata, createdOn, and extendedMetadataSize.
return isIcebergMode;
}

boolean getComputeExtendedMetadataSize() {
// When in Iceberg mode, extendedMetadataSize is computed. Otherwise, it is -1.
return isIcebergMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
import net.snowflake.ingest.utils.Constants;

/** Response to the OpenChannelRequest */
class OpenChannelResponse extends StreamingIngestResponse {
Expand All @@ -22,6 +23,7 @@ class OpenChannelResponse extends StreamingIngestResponse {
private String encryptionKey;
private Long encryptionKeyId;
private FileLocationInfo icebergLocationInfo;
private String icebergSerializationPolicy;

@JsonProperty("status_code")
void setStatusCode(Long statusCode) {
Expand Down Expand Up @@ -140,4 +142,13 @@ void setIcebergLocationInfo(FileLocationInfo icebergLocationInfo) {
FileLocationInfo getIcebergLocationInfo() {
return this.icebergLocationInfo;
}

@JsonProperty("iceberg_serialization_policy")
void setIcebergSerializationPolicy(String icebergSerializationPolicy) {
this.icebergSerializationPolicy = icebergSerializationPolicy;
}

Constants.IcebergSerializationPolicy getIcebergSerializationPolicy() {
return Constants.IcebergSerializationPolicy.fromName(this.icebergSerializationPolicy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public class ParquetFlusher implements Flusher<ParquetChunkData> {
private final Optional<Integer> maxRowGroups;

private final Constants.BdecParquetCompression bdecParquetCompression;
private final ParquetProperties.WriterVersion parquetWriterVersion;
private final boolean enableDictionaryEncoding;

/** Construct parquet flusher from its schema. */
Expand All @@ -40,13 +39,11 @@ public ParquetFlusher(
long maxChunkSizeInBytes,
Optional<Integer> maxRowGroups,
Constants.BdecParquetCompression bdecParquetCompression,
ParquetProperties.WriterVersion parquetWriterVersion,
boolean enableDictionaryEncoding) {
this.schema = schema;
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxRowGroups = maxRowGroups;
this.bdecParquetCompression = bdecParquetCompression;
this.parquetWriterVersion = parquetWriterVersion;
this.enableDictionaryEncoding = enableDictionaryEncoding;
}

Expand All @@ -69,6 +66,7 @@ private SerializationResult serializeFromJavaObjects(
BdecParquetWriter parquetWriter;
ByteArrayOutputStream mergedData = new ByteArrayOutputStream();
Pair<Long, Long> chunkMinMaxInsertTimeInMs = null;
ParquetProperties.WriterVersion parquetWriterVersion = null;

for (ChannelData<ParquetChunkData> data : channelsDataPerTable) {
// Create channel metadata
Expand Down Expand Up @@ -110,6 +108,15 @@ private SerializationResult serializeFromJavaObjects(
chunkMinMaxInsertTimeInMs, data.getMinMaxInsertTimeInMs());
}

// Check if all the channels have the same parquet writer version
if (parquetWriterVersion == null) {
parquetWriterVersion = data.getChannelContext().getParquetWriterVersion();
} else if (!parquetWriterVersion.equals(data.getChannelContext().getParquetWriterVersion())) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
"Parquet writer version and storage serialization policy mismatch within a chunk");
}

rows.addAll(data.getVectors().rows);

rowCount += data.getRowCount();
Expand Down
Loading

0 comments on commit 31386d5

Please sign in to comment.