Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CLPV2_ZSTD and CLPV2_LZ4 raw forward index compression codecs. #14661

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -129,9 +129,10 @@ public class CLPForwardIndexCreatorV2 implements ForwardIndexCreator {
private final ChunkCompressionType _chunkCompressionType;

/**
* Initializes a forward index creator for the given column using the provided base directory and column statistics.
* This constructor is specifically used by {@code ForwardIndexCreatorFactory}. Unlike other immutable forward index
* constructors, this one handles the entire process of converting a mutable forward index into an immutable one.
* Initializes a forward index creator for the given column using the provided base directory, column statistics and
* chunk compressor type. This constructor is specifically used by {@code ForwardIndexCreatorFactory}. Unlike other
* immutable forward index constructors, this one handles the entire process of converting a mutable forward index
* into an immutable one.
*
* <p>The {@code columnStatistics} object passed into this constructor should contain a reference to the mutable
* forward index ({@link CLPMutableForwardIndexV2}). The data from the mutable index is efficiently copied over
@@ -142,12 +143,26 @@ public class CLPForwardIndexCreatorV2 implements ForwardIndexCreator {
* @param baseIndexDir The base directory where the forward index files will be stored.
* @param columnStatistics The column statistics containing the CLP forward index information, including a reference
* to the mutable forward index.
* @param chunkCompressionType The chunk compressor type used to compress internal data columns
* @throws IOException If there is an error during initialization or while accessing the file system.
*/
public CLPForwardIndexCreatorV2(File baseIndexDir, ColumnStatistics columnStatistics)
public CLPForwardIndexCreatorV2(File baseIndexDir, ColumnStatistics columnStatistics,
ChunkCompressionType chunkCompressionType)
throws IOException {
this(baseIndexDir, ((CLPStatsProvider) columnStatistics).getCLPV2Stats().getClpMutableForwardIndexV2(),
ChunkCompressionType.ZSTANDARD);
chunkCompressionType);
}

/**
* Same as above, except with chunk compressor set to ZStandard by default
* @param baseIndexDir The base directory where the forward index files will be stored.
* @param columnStatistics The column statistics containing the CLP forward index information, including a reference
* to the mutable forward index.
* @throws IOException If there is an error during initialization or while accessing the file system.
*/
public CLPForwardIndexCreatorV2(File baseIndexDir, ColumnStatistics columnStatistics)
throws IOException {
this(baseIndexDir, columnStatistics, ChunkCompressionType.ZSTANDARD);
}

/**
Original file line number Diff line number Diff line change
@@ -73,11 +73,19 @@ public static ForwardIndexCreator createIndexCreator(IndexCreationContext contex
// Dictionary disabled columns
DataType storedType = fieldSpec.getDataType().getStoredType();
if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLP) {
// CLP (V1) uses hard-coded chunk compressor which is set to `PassThrough`
return new CLPForwardIndexCreatorV1(indexDir, columnName, numTotalDocs, context.getColumnStatistics());
}
if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLPV2) {
// Use the default chunk compression codec for CLP, currently configured to use ZStandard
return new CLPForwardIndexCreatorV2(indexDir, context.getColumnStatistics());
}
if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLPV2_ZSTD) {
return new CLPForwardIndexCreatorV2(indexDir, context.getColumnStatistics(), ChunkCompressionType.ZSTANDARD);
}
if (indexConfig.getCompressionCodec() == FieldConfig.CompressionCodec.CLPV2_LZ4) {
return new CLPForwardIndexCreatorV2(indexDir, context.getColumnStatistics(), ChunkCompressionType.LZ4);
}
ChunkCompressionType chunkCompressionType = indexConfig.getChunkCompressionType();
if (chunkCompressionType == null) {
chunkCompressionType = ForwardIndexType.getDefaultCompressionType(fieldSpec.getFieldType());
Original file line number Diff line number Diff line change
@@ -256,7 +256,9 @@ public MutableIndex createMutableIndex(MutableIndexContext context, ForwardIndex
// CLP (V1) always have clp encoding enabled whereas V2 is dynamic
clpMutableForwardIndex.forceClpEncoding();
return clpMutableForwardIndex;
} else if (config.getCompressionCodec() == CompressionCodec.CLPV2) {
} else if (config.getCompressionCodec() == CompressionCodec.CLPV2
|| config.getCompressionCodec() == CompressionCodec.CLPV2_ZSTD
|| config.getCompressionCodec() == CompressionCodec.CLPV2_LZ4) {
CLPMutableForwardIndexV2 clpMutableForwardIndex =
new CLPMutableForwardIndexV2(column, context.getMemoryManager());
return clpMutableForwardIndex;
Original file line number Diff line number Diff line change
@@ -1204,10 +1204,12 @@ private static void validateFieldConfigList(TableConfig tableConfig, @Nullable S
switch (encodingType) {
case RAW:
Preconditions.checkArgument(compressionCodec == null || compressionCodec.isApplicableToRawIndex()
|| compressionCodec == CompressionCodec.CLP || compressionCodec == CompressionCodec.CLPV2,
|| compressionCodec == CompressionCodec.CLP || compressionCodec == CompressionCodec.CLPV2
|| compressionCodec == CompressionCodec.CLPV2_ZSTD || compressionCodec == CompressionCodec.CLPV2_LZ4,
"Compression codec: %s is not applicable to raw index",
compressionCodec);
if ((compressionCodec == CompressionCodec.CLP || compressionCodec == CompressionCodec.CLPV2)
if ((compressionCodec == CompressionCodec.CLP || compressionCodec == CompressionCodec.CLPV2
|| compressionCodec == CompressionCodec.CLPV2_ZSTD || compressionCodec == CompressionCodec.CLPV2_LZ4)
&& schema != null) {
Preconditions.checkArgument(
schema.getFieldSpecFor(columnName).getDataType().getStoredType() == DataType.STRING,
Original file line number Diff line number Diff line change
@@ -114,12 +114,12 @@ public void testCLPWriter()
Assert.assertTrue((float) rawStringFwdIndexSizeZSTD / clpFwdIndexSizeZSTD >= 0.19);
}

private long createStringRawForwardIndex(ChunkCompressionType compressionType, int maxLength)
private long createStringRawForwardIndex(ChunkCompressionType chunkCompressionType, int maxLength)
throws IOException {
// Create a raw string immutable forward index
TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
SingleValueVarByteRawIndexCreator index =
new SingleValueVarByteRawIndexCreator(TEMP_DIR, compressionType, COLUMN_NAME, _logMessages.size(),
new SingleValueVarByteRawIndexCreator(TEMP_DIR, chunkCompressionType, COLUMN_NAME, _logMessages.size(),
FieldSpec.DataType.STRING, maxLength);
for (String logMessage : _logMessages) {
index.putString(logMessage);
@@ -132,9 +132,9 @@ private long createStringRawForwardIndex(ChunkCompressionType compressionType, i
}

private long createAndValidateClpImmutableForwardIndex(CLPMutableForwardIndexV2 clpMutableForwardIndexV2,
ChunkCompressionType compressionType)
ChunkCompressionType chunkCompressionType)
throws IOException {
long indexSize = createClpImmutableForwardIndex(clpMutableForwardIndexV2, compressionType);
long indexSize = createClpImmutableForwardIndex(clpMutableForwardIndexV2, chunkCompressionType);

// Read from immutable forward index and validate the content
File indexFile = new File(TEMP_DIR, COLUMN_NAME + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
@@ -149,12 +149,12 @@ private long createAndValidateClpImmutableForwardIndex(CLPMutableForwardIndexV2
}

private long createClpImmutableForwardIndex(CLPMutableForwardIndexV2 clpMutableForwardIndexV2,
ChunkCompressionType compressionType)
ChunkCompressionType chunkCompressionType)
throws IOException {
// Create a CLP immutable forward index from mutable forward index
TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR);
CLPForwardIndexCreatorV2 clpForwardIndexCreatorV2 =
new CLPForwardIndexCreatorV2(TEMP_DIR, clpMutableForwardIndexV2, compressionType);
new CLPForwardIndexCreatorV2(TEMP_DIR, clpMutableForwardIndexV2, chunkCompressionType);
for (int i = 0; i < _logMessages.size(); i++) {
clpForwardIndexCreatorV2.putString(clpMutableForwardIndexV2.getString(i));
}
Original file line number Diff line number Diff line change
@@ -76,6 +76,8 @@ public ForwardIndexConfig(@Nullable Boolean disabled, @Nullable CompressionCodec
case PASS_THROUGH:
case CLP:
case CLPV2:
case CLPV2_ZSTD:
case CLPV2_LZ4:
_chunkCompressionType = ChunkCompressionType.PASS_THROUGH;
_dictIdCompressionType = null;
break;
Original file line number Diff line number Diff line change
@@ -144,7 +144,10 @@ public enum CompressionCodec {
// CLP is a special type of compression codec that isn't generally applicable to all RAW columns and has a special
// handling for log lines (see {@link CLPForwardIndexCreatorV1} and {@link CLPForwardIndexCreatorV2)
CLP(false, false),
CLPV2(false, false);
CLPV2(false, false),
CLPV2_ZSTD(false, false),
CLPV2_LZ4(false, false);

//@formatter:on

private final boolean _applicableToRawIndex;