Skip to content

Commit

Permalink
done
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Oct 15, 2024
1 parent 6d6ba94 commit bef322d
Show file tree
Hide file tree
Showing 24 changed files with 911 additions and 215 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down Expand Up @@ -642,13 +642,17 @@ public synchronized void close(String name) {
*
* @param rowCount: count of rows in the given buffer
* @param colStats: map of column name to RowBufferStats
* @param setAllDefaultValues: whether to set default values for all null fields the EPs
* irrespective of the data type of this column
* @param setAllDefaultValues: whether to set default values for all null min/max field in the EPs
* @param enableDistinctValuesCount: whether to include valid NDV in the EPs irrespective of the
* data type of this column
* @return the EPs built from column stats
*/
static EpInfo buildEpInfoFromStats(
long rowCount, Map<String, RowBufferStats> colStats, boolean setAllDefaultValues) {
EpInfo epInfo = new EpInfo(rowCount, new HashMap<>());
long rowCount,
Map<String, RowBufferStats> colStats,
boolean setAllDefaultValues,
boolean enableDistinctValuesCount) {
EpInfo epInfo = new EpInfo(rowCount, new HashMap<>(), enableDistinctValuesCount);
for (Map.Entry<String, RowBufferStats> colStat : colStats.entrySet()) {
RowBufferStats stat = colStat.getValue();
FileColumnProperties dto = new FileColumnProperties(stat, setAllDefaultValues);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import net.snowflake.ingest.utils.Cryptor;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.Utils;
import org.apache.commons.codec.binary.Hex;
import org.apache.parquet.hadoop.ParquetFileWriter;

Expand Down Expand Up @@ -90,6 +91,7 @@ static <T> Blob constructBlobAndMetadata(
final byte[] compressedChunkData;
final int chunkLength;
final int compressedChunkDataSize;
int extendedMetadataSize = -1;

if (internalParameterProvider.getEnableChunkEncryption()) {
Pair<byte[], Integer> paddedChunk =
Expand All @@ -111,6 +113,10 @@ static <T> Blob constructBlobAndMetadata(
compressedChunkData = serializedChunk.chunkData.toByteArray();
chunkLength = compressedChunkData.length;
compressedChunkDataSize = chunkLength;

if (internalParameterProvider.setIcebergSpecificFieldsInEp()) {
extendedMetadataSize = Utils.getExtendedMetadataSize(compressedChunkData, chunkLength);
}
}

// Compute the md5 of the chunk data
Expand All @@ -135,7 +141,8 @@ static <T> Blob constructBlobAndMetadata(
AbstractRowBuffer.buildEpInfoFromStats(
serializedChunk.rowCount,
serializedChunk.columnEpStatsMapCombined,
internalParameterProvider.setAllDefaultValuesInEp()))
internalParameterProvider.setAllDefaultValuesInEp(),
internalParameterProvider.isEnableDistinctValuesCount()))
.setFirstInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getFirst())
.setLastInsertTimeInMs(serializedChunk.chunkMinMaxInsertTimeInMs.getSecond());

Expand All @@ -145,7 +152,7 @@ static <T> Blob constructBlobAndMetadata(
.setMinorVersion(Constants.PARQUET_MINOR_VERSION)
// set createdOn in seconds
.setCreatedOn(System.currentTimeMillis() / 1000)
.setExtendedMetadataSize(-1L);
.setExtendedMetadataSize((long) extendedMetadataSize);
}

ChunkMetadata chunkMetadata = chunkMetadataBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ public class ClientBufferParameters {

private boolean isIcebergMode;

private boolean enableDistinctValuesCount;

private boolean enableValuesCount;

/**
* Private constructor used for test methods
*
Expand All @@ -38,13 +42,17 @@ private ClientBufferParameters(
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic,
Optional<Integer> maxRowGroups,
boolean isIcebergMode) {
boolean isIcebergMode,
boolean enableDistinctValuesCount,
boolean enableValuesCount) {
this.maxChunkSizeInBytes = maxChunkSizeInBytes;
this.maxAllowedRowSizeInBytes = maxAllowedRowSizeInBytes;
this.bdecParquetCompression = bdecParquetCompression;
this.enableNewJsonParsingLogic = enableNewJsonParsingLogic;
this.maxRowGroups = maxRowGroups;
this.isIcebergMode = isIcebergMode;
this.enableDistinctValuesCount = enableDistinctValuesCount;
this.enableValuesCount = enableValuesCount;
}

/** @param clientInternal reference to the client object where the relevant parameters are set */
Expand All @@ -65,14 +73,22 @@ public ClientBufferParameters(SnowflakeStreamingIngestClientInternal clientInter
clientInternal != null
? clientInternal.getParameterProvider().isEnableNewJsonParsingLogic()
: ParameterProvider.ENABLE_NEW_JSON_PARSING_LOGIC_DEFAULT;
this.isIcebergMode =
clientInternal != null
? clientInternal.isIcebergMode()
: ParameterProvider.IS_ICEBERG_MODE_DEFAULT;
this.maxRowGroups =
isIcebergMode
? Optional.of(InternalParameterProvider.MAX_ROW_GROUP_COUNT_ICEBERG_MODE_DEFAULT)
: Optional.empty();
this.isIcebergMode =
clientInternal != null
? clientInternal.isIcebergMode()
: ParameterProvider.IS_ICEBERG_MODE_DEFAULT;
this.enableDistinctValuesCount =
clientInternal != null
? clientInternal.getInternalParameterProvider().isEnableDistinctValuesCount()
: InternalParameterProvider.ENABLE_DISTINCT_VALUES_COUNT_DEFAULT;
this.enableValuesCount =
clientInternal != null
? clientInternal.getInternalParameterProvider().isEnableValuesCount()
: InternalParameterProvider.ENABLE_VALUES_COUNT_DEFAULT;
}

/**
Expand All @@ -87,14 +103,18 @@ public static ClientBufferParameters test_createClientBufferParameters(
Constants.BdecParquetCompression bdecParquetCompression,
boolean enableNewJsonParsingLogic,
Optional<Integer> maxRowGroups,
boolean isIcebergMode) {
boolean isIcebergMode,
boolean enableDistinctValuesCount,
boolean enableValuesCount) {
return new ClientBufferParameters(
maxChunkSizeInBytes,
maxAllowedRowSizeInBytes,
bdecParquetCompression,
enableNewJsonParsingLogic,
maxRowGroups,
isIcebergMode);
isIcebergMode,
enableDistinctValuesCount,
enableValuesCount);
}

public long getMaxChunkSizeInBytes() {
Expand Down Expand Up @@ -125,6 +145,14 @@ public String getParquetMessageTypeName() {
return isIcebergMode ? PARQUET_MESSAGE_TYPE_NAME : BDEC_PARQUET_MESSAGE_TYPE_NAME;
}

public boolean isEnableDistinctValuesCount() {
return enableDistinctValuesCount;
}

public boolean isEnableValuesCount() {
return enableDistinctValuesCount;
}

public boolean isEnableDictionaryEncoding() {
return isIcebergMode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@ class EpInfo {

private Map<String, FileColumnProperties> columnEps;

private boolean enableDistinctValuesCount;

/** Default constructor, needed for Jackson */
EpInfo() {}

EpInfo(long rowCount, Map<String, FileColumnProperties> columnEps) {
EpInfo(
long rowCount,
Map<String, FileColumnProperties> columnEps,
boolean enableDistinctValuesCount) {
this.rowCount = rowCount;
this.columnEps = columnEps;
this.enableDistinctValuesCount = enableDistinctValuesCount;
}

/** Some basic verification logic to make sure the EP info is correct */
Expand All @@ -35,8 +41,8 @@ public void verifyEpInfo() {
colName, colEp.getNullCount(), rowCount));
}

// Make sure the NDV should always be -1
if (colEp.getDistinctValues() != EP_NDV_UNKNOWN) {
// Make sure the NDV should always be -1 when the NDV set to default
if (!enableDistinctValuesCount && colEp.getDistinctValues() != EP_NDV_UNKNOWN) {
throw new SFException(
ErrorCode.INTERNAL_ERROR,
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class FileColumnProperties {

private long nullCount;

// for elements in repeated columns
private Long numberOfValues;

// for binary or string columns
private long maxLength;

Expand Down Expand Up @@ -110,6 +113,7 @@ class FileColumnProperties {
this.setMinStrNonCollated(null);
this.setNullCount(stats.getCurrentNullCount());
this.setDistinctValues(stats.getDistinctValues());
this.setNumberOfValues(stats.getNumberOfValues());
}

private void setIntValues(RowBufferStats stats) {
Expand Down Expand Up @@ -284,6 +288,16 @@ void setMaxStrNonCollated(String maxStrNonCollated) {
this.maxStrNonCollated = maxStrNonCollated;
}

@JsonProperty("numberOfValues")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
Long getNumberOfValues() {
return numberOfValues;
}

void setNumberOfValues(Long numberOfValues) {
this.numberOfValues = numberOfValues;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("{");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,11 +597,13 @@ BlobMetadata buildAndUpload(
InvalidKeyException {
Timer.Context buildContext = Utils.createTimerContext(this.owningClient.buildLatency);

InternalParameterProvider paramProvider = this.owningClient.getInternalParameterProvider();
// Construct the blob along with the metadata of the blob
BlobBuilder.Blob blob =
BlobBuilder.constructBlobAndMetadata(
blobPath.fileName, blobData, bdecVersion, paramProvider);
blobPath.fileName,
blobData,
bdecVersion,
this.owningClient.getInternalParameterProvider());

blob.blobStats.setBuildDurationMs(buildContext);

Expand Down
Loading

0 comments on commit bef322d

Please sign in to comment.