Skip to content

Commit

Permalink
fix npe
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-alhuang committed Oct 16, 2024
1 parent bef322d commit 4d9c70b
Show file tree
Hide file tree
Showing 19 changed files with 189 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,9 @@ public InsertValidationResponse insertRows(
// Temp stats map to use until all the rows are validated
@VisibleForTesting Map<String, RowBufferStats> tempStatsMap;

// Map of the column name to the column object, used for null/missing column check
protected final Map<String, ParquetColumn> fieldIndex;

// Lock used to protect the buffers from concurrent read/write
private final Lock flushLock;

Expand Down Expand Up @@ -352,6 +355,8 @@ public InsertValidationResponse insertRows(
// Initialize empty stats
this.statsMap = new HashMap<>();
this.tempStatsMap = new HashMap<>();

this.fieldIndex = new HashMap<>();
}

/**
Expand Down Expand Up @@ -427,7 +432,7 @@ Set<String> verifyInputColumns(
List<String> missingCols = new ArrayList<>();
for (String columnName : this.nonNullableFieldNames) {
if (!inputColNamesMap.containsKey(columnName)) {
missingCols.add(statsMap.get(columnName).getColumnDisplayName());
missingCols.add(fieldIndex.get(columnName).columnMetadata.getName());
}
}

Expand All @@ -447,7 +452,7 @@ Set<String> verifyInputColumns(
for (String columnName : this.nonNullableFieldNames) {
if (inputColNamesMap.containsKey(columnName)
&& row.get(inputColNamesMap.get(columnName)) == null) {
nullValueNotNullCols.add(statsMap.get(columnName).getColumnDisplayName());
nullValueNotNullCols.add(fieldIndex.get(columnName).columnMetadata.getName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import net.snowflake.ingest.utils.Cryptor;
import net.snowflake.ingest.utils.Logging;
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.Utils;
import org.apache.commons.codec.binary.Hex;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.hadoop.ParquetFileWriter;

/**
Expand Down Expand Up @@ -91,7 +91,8 @@ static <T> Blob constructBlobAndMetadata(
final byte[] compressedChunkData;
final int chunkLength;
final int compressedChunkDataSize;
int extendedMetadataSize = -1;
long extendedMetadataSize = -1;
long metadataSize = -1;

if (internalParameterProvider.getEnableChunkEncryption()) {
Pair<byte[], Integer> paddedChunk =
Expand All @@ -115,7 +116,8 @@ static <T> Blob constructBlobAndMetadata(
compressedChunkDataSize = chunkLength;

if (internalParameterProvider.setIcebergSpecificFieldsInEp()) {
extendedMetadataSize = Utils.getExtendedMetadataSize(compressedChunkData, chunkLength);
metadataSize = getExtendedMetadataSize(compressedChunkData);
extendedMetadataSize = serializedChunk.extendedMetadataSize;
}
}

Expand Down Expand Up @@ -148,11 +150,12 @@ static <T> Blob constructBlobAndMetadata(

if (internalParameterProvider.setIcebergSpecificFieldsInEp()) {
chunkMetadataBuilder
.setMajorVersion(ParquetFileWriter.CURRENT_VERSION)
.setMajorVersion(Constants.PARQUET_MAJOR_VERSION)
.setMinorVersion(Constants.PARQUET_MINOR_VERSION)
// set createdOn in seconds
.setCreatedOn(System.currentTimeMillis() / 1000)
.setExtendedMetadataSize((long) extendedMetadataSize);
.setMetadataSize(metadataSize)
.setExtendedMetadataSize(extendedMetadataSize);
}

ChunkMetadata chunkMetadata = chunkMetadataBuilder.build();
Expand Down Expand Up @@ -298,4 +301,22 @@ static class Blob {
this.blobStats = blobStats;
}
}

/**
* Get the metadata size (footer size) from a parquet file
*
* @param bytes the serialized parquet file
* @return the extended metadata size
*/
static long getExtendedMetadataSize(byte[] bytes) throws IOException {
final int magicOffset = bytes.length - ParquetFileWriter.MAGIC.length;
final int footerSizeOffset = magicOffset - Integer.BYTES;
if (footerSizeOffset < 0
|| !ParquetFileWriter.MAGIC_STR.equals(
new String(bytes, magicOffset, ParquetFileWriter.MAGIC.length))) {
throw new IllegalArgumentException("Invalid parquet file");
}

return BytesUtils.readIntLittleEndian(bytes, footerSizeOffset);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2021-2024 Snowflake Computing Inc. All rights reserved.
*/

package net.snowflake.ingest.streaming.internal;
Expand All @@ -26,6 +26,7 @@ class ChunkMetadata {
private Integer majorVersion;
private Integer minorVersion;
private Long createdOn;
private Long metadataSize;
private Long extendedMetadataSize;

static Builder builder() {
Expand All @@ -51,6 +52,7 @@ static class Builder {
private Integer majorVersion;
private Integer minorVersion;
private Long createdOn;
private Long metadataSize;
private Long extendedMetadataSize;

Builder setOwningTableFromChannelContext(ChannelFlushContext channelFlushContext) {
Expand Down Expand Up @@ -124,6 +126,11 @@ Builder setCreatedOn(Long createdOn) {
return this;
}

Builder setMetadataSize(Long metadataSize) {
this.metadataSize = metadataSize;
return this;
}

Builder setExtendedMetadataSize(Long extendedMetadataSize) {
this.extendedMetadataSize = extendedMetadataSize;
return this;
Expand Down Expand Up @@ -258,6 +265,12 @@ Long getCreatedOn() {
return this.createdOn;
}

@JsonProperty("metadata_size")
@JsonInclude(JsonInclude.Include.NON_NULL)
Long getMetadataSize() {
return this.metadataSize;
}

@JsonProperty("ext_metadata_size")
@JsonInclude(JsonInclude.Include.NON_NULL)
Long getExtendedMetadataSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class FileColumnProperties {
private long nullCount;

// for elements in repeated columns
private Long numberOfValues;
private long numberOfValues;

// for binary or string columns
private long maxLength;
Expand Down Expand Up @@ -289,12 +289,12 @@ void setMaxStrNonCollated(String maxStrNonCollated) {
}

@JsonProperty("numberOfValues")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
Long getNumberOfValues() {
@JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = IgnoreMinusOneFilter.class)
long getNumberOfValues() {
return numberOfValues;
}

void setNumberOfValues(Long numberOfValues) {
void setNumberOfValues(long numberOfValues) {
this.numberOfValues = numberOfValues;
}

Expand Down Expand Up @@ -360,4 +360,14 @@ public int hashCode() {
nullCount,
maxLength);
}

static class IgnoreMinusOneFilter {
@Override
public boolean equals(Object obj) {
if (obj instanceof Long) {
return (Long) obj == -1;
}
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,23 @@ class SerializationResult {
final float chunkEstimatedUncompressedSize;
final ByteArrayOutputStream chunkData;
final Pair<Long, Long> chunkMinMaxInsertTimeInMs;
final long extendedMetadataSize;

public SerializationResult(
List<ChannelMetadata> channelsMetadataList,
Map<String, RowBufferStats> columnEpStatsMapCombined,
long rowCount,
float chunkEstimatedUncompressedSize,
ByteArrayOutputStream chunkData,
Pair<Long, Long> chunkMinMaxInsertTimeInMs) {
Pair<Long, Long> chunkMinMaxInsertTimeInMs,
long extendedMetadataSize) {
this.channelsMetadataList = channelsMetadataList;
this.columnEpStatsMapCombined = columnEpStatsMapCombined;
this.rowCount = rowCount;
this.chunkEstimatedUncompressedSize = chunkEstimatedUncompressedSize;
this.chunkData = chunkData;
this.chunkMinMaxInsertTimeInMs = chunkMinMaxInsertTimeInMs;
this.extendedMetadataSize = extendedMetadataSize;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import net.snowflake.ingest.utils.Pair;
import net.snowflake.ingest.utils.SFException;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.BdecParquetWriter;
import org.apache.parquet.hadoop.SnowflakeParquetWriter;
import org.apache.parquet.schema.MessageType;

/**
Expand Down Expand Up @@ -66,7 +66,7 @@ private SerializationResult serializeFromJavaObjects(
String firstChannelFullyQualifiedTableName = null;
Map<String, RowBufferStats> columnEpStatsMapCombined = null;
List<List<Object>> rows = null;
BdecParquetWriter parquetWriter;
SnowflakeParquetWriter parquetWriter;
ByteArrayOutputStream mergedData = new ByteArrayOutputStream();
Pair<Long, Long> chunkMinMaxInsertTimeInMs = null;

Expand Down Expand Up @@ -129,7 +129,7 @@ private SerializationResult serializeFromJavaObjects(
// http://go/streams-on-replicated-mixed-tables
metadata.put(Constants.PRIMARY_FILE_ID_KEY, StreamingIngestUtils.getShortname(filePath));
parquetWriter =
new BdecParquetWriter(
new SnowflakeParquetWriter(
mergedData,
schema,
metadata,
Expand All @@ -150,7 +150,8 @@ private SerializationResult serializeFromJavaObjects(
rowCount,
chunkEstimatedUncompressedSize,
mergedData,
chunkMinMaxInsertTimeInMs);
chunkMinMaxInsertTimeInMs,
parquetWriter.getExtendedMetadataSize());
}

/**
Expand All @@ -164,7 +165,7 @@ private SerializationResult serializeFromJavaObjects(
* Used only for logging purposes if there is a mismatch.
*/
private void verifyRowCounts(
BdecParquetWriter writer,
SnowflakeParquetWriter writer,
long totalMetadataRowCount,
List<ChannelData<ParquetChunkData>> channelsDataPerTable,
long javaSerializationTotalRowCount) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
*/
public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {

private final Map<String, ParquetColumn> fieldIndex;

/* map that contains metadata like typeinfo for columns and other information needed by the server scanner */
private final Map<String, String> metadata;

Expand Down Expand Up @@ -72,7 +70,6 @@ public class ParquetRowBuffer extends AbstractRowBuffer<ParquetChunkData> {
clientBufferParameters,
offsetTokenVerificationFunction,
telemetryService);
this.fieldIndex = new HashMap<>();
this.metadata = new HashMap<>();
this.data = new ArrayList<>();
this.tempData = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package net.snowflake.ingest.streaming.internal;

import static net.snowflake.ingest.utils.Constants.EP_NDV_UNKNOWN;
import static net.snowflake.ingest.utils.Constants.EP_NV_UNKNOWN;

import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -48,7 +49,7 @@ class RowBufferStats {
private final boolean enableDistinctValuesCount;
private Set<Object> distinctValues;
private final boolean enableValuesCount;
private Long numberOfValues;
private long numberOfValues;

RowBufferStats(
String columnDisplayName,
Expand Down Expand Up @@ -310,9 +311,8 @@ long getDistinctValues() {
return enableDistinctValuesCount ? distinctValues.size() : EP_NDV_UNKNOWN;
}

// TODO: change default to -1 after Oct 17
Long getNumberOfValues() {
return enableValuesCount ? numberOfValues : null;
long getNumberOfValues() {
return enableValuesCount ? numberOfValues : EP_NV_UNKNOWN;
}

String getCollationDefinitionString() {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/net/snowflake/ingest/utils/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class Constants {
public static final int MAX_STREAMING_INGEST_API_CHANNEL_RETRY = 3;
public static final int STREAMING_INGEST_TELEMETRY_UPLOAD_INTERVAL_IN_SEC = 10;
public static final long EP_NDV_UNKNOWN = -1L;
public static final long EP_NV_UNKNOWN = -1L;
public static final int MAX_OAUTH_REFRESH_TOKEN_RETRY = 3;
public static final int BINARY_COLUMN_MAX_SIZE = 8 * 1024 * 1024;
public static final int VARCHAR_COLUMN_MAX_SIZE = 16 * 1024 * 1024;
Expand All @@ -72,6 +73,7 @@ public class Constants {
public static final String DROP_CHANNEL_ENDPOINT = "/v1/streaming/channels/drop/";
public static final String REGISTER_BLOB_ENDPOINT = "/v1/streaming/channels/write/blobs/";

public static final int PARQUET_MAJOR_VERSION = 1;
public static final int PARQUET_MINOR_VERSION = 0;

/**
Expand Down
23 changes: 0 additions & 23 deletions src/main/java/net/snowflake/ingest/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import com.codahale.metrics.Timer;
import io.netty.util.internal.PlatformDependent;
import java.io.IOException;
import java.io.StringReader;
import java.lang.management.BufferPoolMXBean;
import java.lang.management.ManagementFactory;
Expand All @@ -30,8 +29,6 @@
import java.util.Properties;
import net.snowflake.client.core.SFSessionProperty;
import org.apache.commons.codec.binary.Base64;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
import org.bouncycastle.openssl.PEMParser;
import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;
Expand Down Expand Up @@ -433,24 +430,4 @@ public static String concatDotPath(String... path) {
}
return sb.toString();
}

/**
* Get the extended metadata size (footer size) from a parquet file
*
* @param bytes the serialized parquet file
* @param length the length of the byte array without padding
* @return the extended metadata size
*/
public static int getExtendedMetadataSize(byte[] bytes, int length) throws IOException {
final int magicOffset = length - ParquetFileWriter.MAGIC.length;
final int footerSizeOffset = magicOffset - Integer.BYTES;
if (bytes.length < length
|| footerSizeOffset < 0
|| !ParquetFileWriter.MAGIC_STR.equals(
new String(bytes, magicOffset, ParquetFileWriter.MAGIC.length))) {
throw new IllegalArgumentException("Invalid parquet file");
}

return BytesUtils.readIntLittleEndian(bytes, footerSizeOffset);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Snowflake Computing Inc. All rights reserved.
* Copyright (c) 2022-2024 Snowflake Computing Inc. All rights reserved.
*/

package org.apache.parquet.hadoop;
Expand Down Expand Up @@ -82,7 +82,7 @@ public void close() throws IOException {
* @param data input data to be read first and then written with outputWriter
* @param outputWriter output parquet writer
*/
public static void readFileIntoWriter(byte[] data, BdecParquetWriter outputWriter) {
public static void readFileIntoWriter(byte[] data, SnowflakeParquetWriter outputWriter) {
try (BdecParquetReader reader = new BdecParquetReader(data)) {
for (List<Object> record = reader.read(); record != null; record = reader.read()) {
outputWriter.writeRow(record);
Expand Down
Loading

0 comments on commit 4d9c70b

Please sign in to comment.