From 3257342dd9cbd9e591ed8bc5e573691f648403ae Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Wed, 10 May 2023 09:03:43 +0200 Subject: [PATCH] Use `ZSTD` Parquet compression codec for Delta Lake by default --- docs/src/main/sphinx/connector/delta-lake.md | 2 +- .../plugin/deltalake/DeltaLakeConfig.java | 2 +- ...stDeltaLakeAlluxioCacheFileOperations.java | 48 +++++++++---------- ...LakeAlluxioCacheMutableTransactionLog.java | 16 +++---- .../plugin/deltalake/TestDeltaLakeConfig.java | 2 +- .../TestDeltaLakeInsertCompatibility.java | 4 +- 6 files changed, 37 insertions(+), 37 deletions(-) diff --git a/docs/src/main/sphinx/connector/delta-lake.md b/docs/src/main/sphinx/connector/delta-lake.md index 2e6785c3a40a..0069b9fe65af 100644 --- a/docs/src/main/sphinx/connector/delta-lake.md +++ b/docs/src/main/sphinx/connector/delta-lake.md @@ -108,7 +108,7 @@ values. Typical usage does not require you to configure them. * `GZIP` The equivalent catalog session property is `compression_codec`. - - `SNAPPY` + - `ZSTD` * - `delta.max-partitions-per-writer` - Maximum number of partitions per writer. - `100` diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index e02a6f5d8cce..1d553252dfe8 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -80,7 +80,7 @@ public class DeltaLakeConfig private boolean tableStatisticsEnabled = true; private boolean extendedStatisticsEnabled = true; private boolean collectExtendedStatisticsOnWrite = true; - private HiveCompressionCodec compressionCodec = HiveCompressionCodec.SNAPPY; + private HiveCompressionCodec compressionCodec = HiveCompressionCodec.ZSTD; private long perTransactionMetastoreCacheMaximumSize = 1000; private boolean storeTableMetadataEnabled; private int storeTableMetadataThreads = 5; diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java index 32bdc7c2cb7c..d7e92d3a063e 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java @@ -95,12 +95,12 @@ public void testCacheFileOperations() .add(new CacheOperation("Alluxio.writeCache", "00000000000000000002.json", 0, 658)) .add(new CacheOperation("InputFile.length", "00000000000000000003.json")) .add(new CacheOperation("InputFile.newStream", "_last_checkpoint")) - .add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 220)) - .add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 220)) - .add(new CacheOperation("Input.readFully", "key=p1/", 0, 220)) - .add(new CacheOperation("Input.readFully", "key=p2/", 0, 220)) - .add(new CacheOperation("Alluxio.writeCache", "key=p1/", 0, 220)) - .add(new CacheOperation("Alluxio.writeCache", "key=p2/", 0, 220)) + .add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 227)) + .add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 227)) + .add(new CacheOperation("Input.readFully", "key=p1/", 0, 227)) + .add(new CacheOperation("Input.readFully", "key=p2/", 0, 227)) + .add(new CacheOperation("Alluxio.writeCache", "key=p1/", 0, 227)) + .add(new CacheOperation("Alluxio.writeCache", "key=p2/", 0, 227)) .build()); assertFileSystemAccesses( "SELECT * FROM test_cache_file_operations", @@ -113,8 +113,8 @@ public void testCacheFileOperations() .add(new CacheOperation("InputFile.length", "00000000000000000002.json")) .add(new CacheOperation("InputFile.length", "00000000000000000003.json")) .add(new CacheOperation("InputFile.newStream", "_last_checkpoint")) - .add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 220)) - .add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 220)) + .add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 227)) + .add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 227)) .build()); assertUpdate("INSERT INTO test_cache_file_operations VALUES ('p3', '3-xyz')", 1); assertUpdate("INSERT INTO test_cache_file_operations VALUES ('p4', '4-xyz')", 1); @@ -139,17 +139,17 @@ public void testCacheFileOperations() .add(new CacheOperation("InputFile.length", "00000000000000000005.json")) .add(new CacheOperation("InputFile.length", "00000000000000000006.json")) .add(new CacheOperation("InputFile.newStream", "_last_checkpoint")) - .add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 220)) - .add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 220)) - .add(new CacheOperation("Alluxio.readCached", "key=p3/", 0, 220)) - .add(new CacheOperation("Alluxio.readCached", "key=p4/", 0, 220)) - .add(new CacheOperation("Alluxio.readCached", "key=p5/", 0, 220)) - .add(new CacheOperation("Input.readFully", "key=p3/", 0, 220)) - .add(new CacheOperation("Input.readFully", "key=p4/", 0, 220)) - .add(new CacheOperation("Input.readFully", "key=p5/", 0, 220)) - .add(new CacheOperation("Alluxio.writeCache", "key=p3/", 0, 220)) - .add(new CacheOperation("Alluxio.writeCache", "key=p4/", 0, 220)) - .add(new CacheOperation("Alluxio.writeCache", "key=p5/", 0, 220)) + .add(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 227)) + .add(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 227)) + .add(new CacheOperation("Alluxio.readCached", "key=p3/", 0, 227)) + .add(new CacheOperation("Alluxio.readCached", "key=p4/", 0, 227)) + .add(new CacheOperation("Alluxio.readCached", "key=p5/", 0, 227)) + .add(new CacheOperation("Input.readFully", "key=p3/", 0, 227)) + .add(new CacheOperation("Input.readFully", "key=p4/", 0, 227)) + .add(new CacheOperation("Input.readFully", "key=p5/", 0, 227)) + .add(new CacheOperation("Alluxio.writeCache", "key=p3/", 0, 227)) + .add(new CacheOperation("Alluxio.writeCache", "key=p4/", 0, 227)) + .add(new CacheOperation("Alluxio.writeCache", "key=p5/", 0, 227)) .build()); assertFileSystemAccesses( "SELECT * FROM test_cache_file_operations", @@ -168,11 +168,11 @@ public void testCacheFileOperations() .add(new CacheOperation("InputFile.length", "00000000000000000005.json")) .add(new CacheOperation("InputFile.length", "00000000000000000006.json")) .add(new CacheOperation("InputFile.newStream", "_last_checkpoint")) - .addCopies(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 220), 1) - .addCopies(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 220), 1) - .addCopies(new CacheOperation("Alluxio.readCached", "key=p3/", 0, 220), 1) - .addCopies(new CacheOperation("Alluxio.readCached", "key=p4/", 0, 220), 1) - .addCopies(new CacheOperation("Alluxio.readCached", "key=p5/", 0, 220), 1) + .addCopies(new CacheOperation("Alluxio.readCached", "key=p1/", 0, 227), 1) + .addCopies(new CacheOperation("Alluxio.readCached", "key=p2/", 0, 227), 1) + .addCopies(new CacheOperation("Alluxio.readCached", "key=p3/", 0, 227), 1) + .addCopies(new CacheOperation("Alluxio.readCached", "key=p4/", 0, 227), 1) + .addCopies(new CacheOperation("Alluxio.readCached", "key=p5/", 0, 227), 1) .build()); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMutableTransactionLog.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMutableTransactionLog.java index 910c3657acfd..04b3cbfba622 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMutableTransactionLog.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMutableTransactionLog.java @@ -79,12 +79,12 @@ public void testTableDataCachedWhileTransactionLogNotCached() .addCopies(new CacheFileSystemTraceUtils.CacheOperation("Input.readTail", "00000000000000000002.checkpoint.parquet"), 2) .add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.length", "00000000000000000003.json")) .add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.newStream", "_last_checkpoint")) - .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p1/", 0, 220)) - .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p2/", 0, 220)) - .add(new CacheFileSystemTraceUtils.CacheOperation("Input.readFully", "key=p1/", 0, 220)) - .add(new CacheFileSystemTraceUtils.CacheOperation("Input.readFully", "key=p2/", 0, 220)) - .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.writeCache", "key=p1/", 0, 220)) - .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.writeCache", "key=p2/", 0, 220)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p1/", 0, 227)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p2/", 0, 227)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Input.readFully", "key=p1/", 0, 227)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Input.readFully", "key=p2/", 0, 227)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.writeCache", "key=p1/", 0, 227)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.writeCache", "key=p2/", 0, 227)) .build()); assertFileSystemAccesses( "SELECT * FROM test_transaction_log_not_cached", @@ -93,8 +93,8 @@ public void testTableDataCachedWhileTransactionLogNotCached() .addCopies(new CacheFileSystemTraceUtils.CacheOperation("Input.readTail", "00000000000000000002.checkpoint.parquet"), 2) .add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.length", "00000000000000000003.json")) .add(new CacheFileSystemTraceUtils.CacheOperation("InputFile.newStream", "_last_checkpoint")) - .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p1/", 0, 220)) - .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p2/", 0, 220)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p1/", 0, 227)) + .add(new CacheFileSystemTraceUtils.CacheOperation("Alluxio.readCached", "key=p2/", 0, 227)) .build()); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index 78750f664d20..fb4555c6fb1d 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -61,7 +61,7 @@ public void testDefaults() .setTableStatisticsEnabled(true) .setExtendedStatisticsEnabled(true) .setCollectExtendedStatisticsOnWrite(true) - .setCompressionCodec(HiveCompressionCodec.SNAPPY) + .setCompressionCodec(HiveCompressionCodec.ZSTD) .setDeleteSchemaLocationsFallback(false) .setParquetTimeZone(TimeZone.getDefault().getID()) .setPerTransactionMetastoreCacheMaximumSize(1000) diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeInsertCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeInsertCompatibility.java index 25bb045d3c61..b7290e0839b0 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeInsertCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeInsertCompatibility.java @@ -435,8 +435,8 @@ public void verifyCompressionCodecsDataProvider() assertThat(onTrino().executeQuery("SHOW SESSION LIKE 'delta.compression_codec'")) .containsOnly(row( "delta.compression_codec", - "SNAPPY", - "SNAPPY", + "ZSTD", + "ZSTD", "varchar", "Compression codec to use when writing new data files. Possible values: " + Stream.of(compressionCodecs())