From 11ff326a7217e6eeae3132c2eada318507b243ae Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Sat, 22 Nov 2025 21:55:15 +0000 Subject: [PATCH 1/2] Encryption for REST catalog --- .../iceberg/rest/RESTSessionCatalog.java | 14 ++ .../iceberg/rest/RESTTableOperations.java | 126 +++++++++++++++++- .../iceberg/hive/HiveTableOperations.java | 2 +- .../iceberg/spark/sql/TestCTASEncryption.java | 10 ++ .../spark/sql/TestTableEncryption.java | 42 +++++- 5 files changed, 185 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index b903f13adc09..b1c78044fa28 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -47,6 +47,8 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.encryption.KeyManagementClient; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; @@ -159,6 +161,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog private Integer pageSize = null; private CloseableGroup closeables = null; private Set endpoints; + private KeyManagementClient kmsClient = null; public RESTSessionCatalog() { this( @@ -256,6 +259,12 @@ public void initialize(String name, Map unresolved) { mergedProps, RESTCatalogProperties.METRICS_REPORTING_ENABLED, RESTCatalogProperties.METRICS_REPORTING_ENABLED_DEFAULT); + + if (mergedProps.containsKey(CatalogProperties.ENCRYPTION_KMS_IMPL)) { + this.kmsClient = EncryptionUtil.createKmsClient(mergedProps); + this.closeables.addCloseable(this.kmsClient); + } + super.initialize(name, mergedProps); } @@ -455,6 +464,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { paths.table(finalIdentifier), Map::of, tableFileIO(context, tableConf, response.credentials()), + kmsClient, tableMetadata, endpoints); @@ -534,6 +544,7 @@ public Table registerTable( paths.table(ident), Map::of, tableFileIO(context, tableConf, response.credentials()), + kmsClient, response.tableMetadata(), endpoints); @@ -793,6 +804,7 @@ public Table create() { paths.table(ident), Map::of, tableFileIO(context, tableConf, response.credentials()), + kmsClient, response.tableMetadata(), endpoints); @@ -820,6 +832,7 @@ public Transaction createTransaction() { paths.table(ident), Map::of, tableFileIO(context, tableConf, response.credentials()), + kmsClient, RESTTableOperations.UpdateType.CREATE, createChanges(meta), meta, @@ -883,6 +896,7 @@ public Transaction replaceTransaction() { paths.table(ident), Map::of, tableFileIO(context, tableConf, response.credentials()), + kmsClient, RESTTableOperations.UpdateType.REPLACE, changes.build(), base, diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java index 38dabc8ae568..6aa7715f9596 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.iceberg.LocationProviders; import org.apache.iceberg.MetadataUpdate; import org.apache.iceberg.SnapshotRef; @@ -32,17 +33,25 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.UpdateRequirement; import org.apache.iceberg.UpdateRequirements; +import org.apache.iceberg.encryption.EncryptedKey; +import org.apache.iceberg.encryption.EncryptingFileIO; import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.EncryptionUtil; +import org.apache.iceberg.encryption.KeyManagementClient; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; +import org.apache.iceberg.encryption.StandardEncryptionManager; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.rest.requests.UpdateTableRequest; import org.apache.iceberg.rest.responses.ErrorResponse; import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.util.LocationUtil; +import org.apache.iceberg.util.PropertyUtil; class RESTTableOperations implements TableOperations { private static final String METADATA_FOLDER_NAME = "metadata"; @@ -57,20 +66,37 @@ enum UpdateType { private final String path; private final Supplier> headers; private final FileIO io; + private final KeyManagementClient kmsClient; private final List createChanges; private final TableMetadata replaceBase; private final Set endpoints; private UpdateType updateType; private TableMetadata current; + private EncryptionManager encryptionManager; + private EncryptingFileIO encryptingFileIO; + private String tableKeyId; + private int encryptionDekLength; + private List encryptedKeysFromMetadata; + RESTTableOperations( RESTClient client, String path, Supplier> headers, FileIO io, + KeyManagementClient kmsClient, TableMetadata current, Set endpoints) { - this(client, path, headers, io, UpdateType.SIMPLE, Lists.newArrayList(), current, endpoints); + this( + client, + path, + headers, + io, + kmsClient, + UpdateType.SIMPLE, + Lists.newArrayList(), + current, + endpoints); } RESTTableOperations( @@ -78,6 +104,7 @@ enum UpdateType { String path, Supplier> headers, FileIO io, + KeyManagementClient kmsClient, UpdateType updateType, List createChanges, TableMetadata current, @@ -86,6 +113,7 @@ enum UpdateType { this.path = path; this.headers = headers; this.io = io; + this.kmsClient = kmsClient; this.updateType = updateType; this.createChanges = createChanges; this.replaceBase = current; @@ -95,6 +123,10 @@ enum UpdateType { this.current = current; } this.endpoints = endpoints; + + // N.B. We don't use this.current because for tables-to-be-created, because it would be null, + // and we still want encrypted properties in this case for its TableOperations. + encryptionPropsFromMetadata(current); } @Override @@ -112,6 +144,21 @@ public TableMetadata refresh() { @Override public void commit(TableMetadata base, TableMetadata metadata) { Endpoint.check(endpoints, Endpoint.V1_UPDATE_TABLE); + + if (encryption() instanceof StandardEncryptionManager) { + // Add encryption keys to the to-be-committed metadata + TableMetadata.Builder builder = TableMetadata.buildFrom(metadata); + for (Map.Entry entry : + EncryptionUtil.encryptionKeys(encryption()).entrySet()) { + builder.addEncryptionKey(entry.getValue()); + } + commitInternal(base, builder.build()); + } else { + commitInternal(base, metadata); + } + } + + private void commitInternal(TableMetadata base, TableMetadata metadata) { Consumer errorHandler; List requirements; List updates; @@ -152,6 +199,18 @@ public void commit(TableMetadata base, TableMetadata metadata) { String.format("Update type %s is not supported", updateType)); } + if (base != null) { + Set removedProps = + base.properties().keySet().stream() + .filter(key -> !metadata.properties().containsKey(key)) + .collect(Collectors.toSet()); + + if (removedProps.contains(TableProperties.ENCRYPTION_TABLE_KEY)) { + throw new IllegalArgumentException( + "Cannot remove encryption key ID from an encrypted table"); + } + } + UpdateTableRequest request = new UpdateTableRequest(requirements, updates); // the error handler will throw necessary exceptions like CommitFailedException and @@ -201,7 +260,44 @@ private boolean reconcileOnSimpleUpdate( @Override public FileIO io() { - return io; + if (tableKeyId == null) { + return io; + } + + if (encryptingFileIO == null) { + encryptingFileIO = EncryptingFileIO.combine(io, encryption()); + } + + return encryptingFileIO; + } + + @Override + public EncryptionManager encryption() { + if (encryptionManager != null) { + return encryptionManager; + } + + if (tableKeyId != null) { + if (kmsClient == null) { + throw new RuntimeException( + "Cannot create encryption manager without a key management client"); + } + + Map encryptionProperties = + ImmutableMap.of( + TableProperties.ENCRYPTION_TABLE_KEY, + tableKeyId, + TableProperties.ENCRYPTION_DEK_LENGTH, + String.valueOf(encryptionDekLength)); + + encryptionManager = + EncryptionUtil.createEncryptionManager( + encryptedKeysFromMetadata, encryptionProperties, kmsClient); + } else { + return PlaintextEncryptionManager.instance(); + } + + return encryptionManager; } private static Long expectedSnapshotIdIfSnapshotAddOnly(List updates) { @@ -241,6 +337,31 @@ private static Long expectedSnapshotIdIfSnapshotAddOnly(List upd return addedSnapshotId; } + private void encryptionPropsFromMetadata(TableMetadata metadata) { + if (metadata == null || metadata.properties() == null) { + return; + } + + encryptedKeysFromMetadata = metadata.encryptionKeys(); + + Map tableProperties = metadata.properties(); + if (tableKeyId == null) { + tableKeyId = tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY); + } + + if (tableKeyId != null && encryptionDekLength <= 0) { + encryptionDekLength = + PropertyUtil.propertyAsInt( + tableProperties, + TableProperties.ENCRYPTION_DEK_LENGTH, + TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT); + } + + // Force re-creation of encryptingFileIO and encryptionManager + encryptingFileIO = null; + encryptionManager = null; + } + private TableMetadata updateCurrentMetadata(LoadTableResponse response) { // LoadTableResponse is used to deserialize the response, but config is not allowed by the REST // spec so it can be @@ -248,6 +369,7 @@ private TableMetadata updateCurrentMetadata(LoadTableResponse response) { if (current == null || !Objects.equals(current.metadataFileLocation(), response.metadataLocation())) { this.current = checkUUID(current, response.tableMetadata()); + encryptionPropsFromMetadata(this.current); } return current; diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java index a4338ec07a5f..b442e9cf8cce 100644 --- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java +++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java @@ -321,7 +321,7 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) { } if (removedProps.contains(TableProperties.ENCRYPTION_TABLE_KEY)) { - throw new RuntimeException("Cannot remove key in encrypted table"); + throw new RuntimeException("Cannot remove encryption key ID from an encrypted table"); } HMSTablePropertyHelper.updateHmsTableForIcebergTable( diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java index 6094ab0ccca5..4e681cb1ac17 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestCTASEncryption.java @@ -32,6 +32,7 @@ import org.apache.iceberg.encryption.UnitestKMS; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.spark.CatalogTestBase; @@ -57,6 +58,15 @@ protected static Object[][] parameters() { SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), appendCatalogEncryptionProperties(SparkCatalogConfig.HIVE.properties()) + }, + { + SparkCatalogConfig.REST.catalogName(), + SparkCatalogConfig.REST.implementation(), + appendCatalogEncryptionProperties( + ImmutableMap.builder() + .putAll(SparkCatalogConfig.REST.properties()) + .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) + .build()) } }; } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java index c71bd28706c9..6836b2f14d6d 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java @@ -44,6 +44,7 @@ import org.apache.iceberg.io.SeekableInputStream; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.spark.CatalogTestBase; @@ -69,6 +70,15 @@ protected static Object[][] parameters() { SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), appendCatalogEncryptionProperties(SparkCatalogConfig.HIVE.properties()) + }, + { + SparkCatalogConfig.REST.catalogName(), + SparkCatalogConfig.REST.implementation(), + appendCatalogEncryptionProperties( + ImmutableMap.builder() + .putAll(SparkCatalogConfig.REST.properties()) + .put(CatalogProperties.URI, restCatalog.properties().get(CatalogProperties.URI)) + .build()) } }; } @@ -105,8 +115,8 @@ private static List currentDataFiles(Table table) { @TestTemplate public void testRefresh() { - catalog.initialize(catalogName, catalogConfig); - Table table = catalog.loadTable(tableIdent); + validationCatalog.initialize(catalogName, catalogConfig); + Table table = validationCatalog.loadTable(tableIdent); assertThat(currentDataFiles(table)).isNotEmpty(); @@ -118,9 +128,8 @@ public void testRefresh() { @TestTemplate public void testTransaction() { - catalog.initialize(catalogName, catalogConfig); - - Table table = catalog.loadTable(tableIdent); + validationCatalog.initialize(catalogName, catalogConfig); + Table table = validationCatalog.loadTable(tableIdent); List dataFiles = currentDataFiles(table); Transaction transaction = table.newTransaction(); @@ -134,6 +143,27 @@ public void testTransaction() { assertThat(currentDataFiles(table).size()).isEqualTo(dataFiles.size() + 1); } + @TestTemplate + public void testTransactionRetry() { + validationCatalog.initialize(catalogName, catalogConfig); + Table table = validationCatalog.loadTable(tableIdent); + + List dataFiles = currentDataFiles(table); + Transaction transaction = table.newTransaction(); + AppendFiles append = transaction.newAppend(); + + // add an arbitrary datafile + append.appendFile(dataFiles.get(0)); + + // append to the table in the meantime. use a separate load to avoid shared operations + validationCatalog.loadTable(tableIdent).newFastAppend().appendFile(dataFiles.get(0)).commit(); + + append.commit(); + transaction.commitTransaction(); + + assertThat(currentDataFiles(table).size()).isEqualTo(dataFiles.size() + 2); + } + @TestTemplate public void testInsertAndDelete() { sql("INSERT INTO %s VALUES (4, 'd', 4.0), (5, 'e', 5.0), (6, 'f', float('NaN'))", tableName); @@ -166,7 +196,7 @@ public void testInsertAndDelete() { public void testKeyDelete() { assertThatThrownBy( () -> sql("ALTER TABLE %s UNSET TBLPROPERTIES (`encryption.key-id`)", tableName)) - .hasMessageContaining("Cannot remove key in encrypted table"); + .hasMessageContaining("Cannot remove encryption key ID from an encrypted table"); } @TestTemplate From 5b7119be894f15b3f892a06c293377a48cd36037 Mon Sep 17 00:00:00 2001 From: Sreesh Maheshwar Date: Sun, 23 Nov 2025 00:37:19 +0000 Subject: [PATCH 2/2] Refresh encryption table properties --- .../apache/iceberg/rest/RESTTableOperations.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java index 6aa7715f9596..5377850aac21 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableOperations.java @@ -124,8 +124,8 @@ enum UpdateType { } this.endpoints = endpoints; - // N.B. We don't use this.current because for tables-to-be-created, because it would be null, - // and we still want encrypted properties in this case for its TableOperations. + // N.B. We don't use this.current due it being null for the CREATE update type; we still + // want encryption configured for this case. encryptionPropsFromMetadata(current); } @@ -344,12 +344,11 @@ private void encryptionPropsFromMetadata(TableMetadata metadata) { encryptedKeysFromMetadata = metadata.encryptionKeys(); + // Refresh encryption-related table properties on new/refreshed metadata Map tableProperties = metadata.properties(); - if (tableKeyId == null) { - tableKeyId = tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY); - } + tableKeyId = tableProperties.get(TableProperties.ENCRYPTION_TABLE_KEY); - if (tableKeyId != null && encryptionDekLength <= 0) { + if (tableKeyId != null) { encryptionDekLength = PropertyUtil.propertyAsInt( tableProperties, @@ -357,7 +356,7 @@ private void encryptionPropsFromMetadata(TableMetadata metadata) { TableProperties.ENCRYPTION_DEK_LENGTH_DEFAULT); } - // Force re-creation of encryptingFileIO and encryptionManager + // Force re-creation of encryption manager encryptingFileIO = null; encryptionManager = null; }