From abccdc324515031c6f7ab8db5bc50c53c198d180 Mon Sep 17 00:00:00 2001 From: William Lo Date: Thu, 19 Dec 2024 12:32:57 -0500 Subject: [PATCH] Adds SQL support for Configurable Table Snapshot History (#262) ## Summary Adds Spark SQL support for configurable table snapshots, which controls the versioning of the Openhouse tables. Syntax is similar to retention but is instead defined as `HISTORY`. History configuration supports both `MAX_AGE` and `VERSIONS`, where we retain all table snapshots that live within `MAX_AGE` and within `VERSIONS`. Example: A table with `MAX_AGE = 1d` will retain all snapshots that are within 1 day of when the snapshot retention job last ran. A table with `VERSIONS = 5` will retain the last 5 snapshots of the table without considering the age of the snapshots If both `MAX_AGE = 1d` and `VERSIONS = 5` is defined, keep the last 5 snapshots within the last day. Note: If there are less than 5 snapshots, then there were less than 5 commits done in the past day. `MAX_AGE` and `VERSIONS` cannot be defined as less than 1. The default maximums of `MAX_AGE` and `VERSIONS` defined in https://github.com/linkedin/openhouse/pull/259 are 3 days and 100 versions respectively. Examples: ``` ALTER TABLE . SET POLICY (HISTORY MAX_AGE=24H VERSIONS=10) ALTER TABLE .
SET POLICY (HISTORY MAX_AGE=3D) ALTER TABLE .
SET POLICY (HISTORY VERSIONS=5) ``` ## Changes - [x] Client-facing API Changes - [x] Internal API Changes - [ ] Bug Fixes - [x] New Features - [ ] Performance Improvements - [ ] Code Style - [ ] Refactoring - [ ] Documentation - [x] Tests For all the boxes checked, please include additional details of the changes made in this pull request. ## Testing Done - [x] Manually Tested on local docker setup. Please include commands ran, and their output. - [x] Added new tests for the changes made. - [x] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. Tested on local docker running spark: Tested setting both policies ``` scala> spark.sql("ALTER TABLE openhouse.db.tb2 SET POLICY ( HISTORY MAX_AGE=3d VERSIONS=3 )").show ++ || ++ ++ scala> spark.sql("SHOW TBLPROPERTIES openhouse.db.tb2 (policies)").show(truncate=false) +--------+-----------------------------------------------------------------------------------------------------------------+ |key |value | +--------+-----------------------------------------------------------------------------------------------------------------+ |policies|{ "sharingEnabled": false, "history": { "maxAge": 3, "granularity": "DAY", "versions": 3 } }| +--------+-----------------------------------------------------------------------------------------------------------------+ ``` Setting only versions: ``` scala> spark.sql("ALTER TABLE openhouse.db.tb2 SET POLICY ( HISTORY VERSIONS=20 )").show ++ || ++ ++ scala> spark.sql("SHOW TBLPROPERTIES openhouse.db.tb2 (policies)").show(truncate=false) +--------+----------------------------------------------------------------------------------------+ |key |value | +--------+----------------------------------------------------------------------------------------+ |policies|{ "sharingEnabled": false, "history": { "maxAge": 0, "versions": 20 } }| +--------+----------------------------------------------------------------------------------------+ ``` Setting only max age ``` scala> spark.sql("ALTER TABLE openhouse.db.tb2 SET POLICY ( HISTORY MAX_AGE=8h )").show ++ || ++ ++ scala> spark.sql("SHOW TBLPROPERTIES openhouse.db.tb2 (policies)").show(truncate=false) +--------+------------------------------------------------------------------------------------------------------------------+ |key |value | +--------+------------------------------------------------------------------------------------------------------------------+ |policies|{ "sharingEnabled": false, "history": { "maxAge": 8, "granularity": "HOUR", "versions": 0 } }| +--------+------------------------------------------------------------------------------------------------------------------+ ``` Also tested negative cases (invalid numbers, past maximums defined in https://github.com/linkedin/openhouse/pull/259) e.g. ``` scala> spark.sql("ALTER TABLE openhouse.db.tb SET POLICY ( HISTORY MAX_AGE=1h VERSIONS=2 )").show org.apache.iceberg.exceptions.BadRequestException: 400 , {"status":"BAD_REQUEST","error":"Bad Request","message":" : History for the table [LocalHadoopCluster.db.tb] max age must be between 1 to 3 days","stacktrace":null,"cause":"Not Available"} ``` For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request. # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [ ] Large PR broken into smaller PRs, and PR plan linked in the description. For all the boxes checked, include additional details of the changes made in this pull request. --------- Co-authored-by: Stas Pak --- .../SetHistoryPolicyStatementTest.java | 112 ++++++++++++++++++ .../extensions/OpenhouseSqlExtensions.g4 | 21 +++- .../OpenhouseSqlExtensionsAstBuilder.scala | 37 ++++-- .../plans/logical/SetHistoryPolicy.scala | 9 ++ .../v2/OpenhouseDataSourceV2Strategy.scala | 4 +- .../datasources/v2/SetHistoryPolicyExec.scala | 45 +++++++ .../plans/logical/SetHistoryPolicy.scala | 9 ++ .../datasources/v2/SetHistoryPolicyExec.scala | 45 +++++++ 8 files changed, 272 insertions(+), 10 deletions(-) create mode 100644 integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetHistoryPolicyStatementTest.java create mode 100644 integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetHistoryPolicy.scala create mode 100644 integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetHistoryPolicyExec.scala create mode 100644 integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetHistoryPolicy.scala create mode 100644 integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetHistoryPolicyExec.scala diff --git a/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetHistoryPolicyStatementTest.java b/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetHistoryPolicyStatementTest.java new file mode 100644 index 000000000..9b8d7526b --- /dev/null +++ b/integrations/spark/spark-3.1/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/statementtest/SetHistoryPolicyStatementTest.java @@ -0,0 +1,112 @@ +package com.linkedin.openhouse.spark.statementtest; + +import com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseParseException; +import java.nio.file.Files; +import java.util.Optional; +import lombok.SneakyThrows; +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.ExplainMode; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class SetHistoryPolicyStatementTest { + private static SparkSession spark = null; + + @SneakyThrows + @BeforeAll + public void setupSpark() { + Path unittest = new Path(Files.createTempDirectory("unittest_settablepolicy").toString()); + spark = + SparkSession.builder() + .master("local[2]") + .config( + "spark.sql.extensions", + ("org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions," + + "com.linkedin.openhouse.spark.extensions.OpenhouseSparkSessionExtensions")) + .config("spark.sql.catalog.openhouse", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.openhouse.type", "hadoop") + .config("spark.sql.catalog.openhouse.warehouse", unittest.toString()) + .getOrCreate(); + } + + @Test + public void testSetHistoryPolicyGood() { + // Validate setting only time setting + Dataset ds = spark.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY MAX_AGE=24H)"); + assert isPlanValid(ds, "db.table", Optional.of("24"), Optional.of("HOUR"), Optional.empty()); + + ds = spark.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY VERSIONS=10)"); + assert isPlanValid(ds, "db.table", Optional.empty(), Optional.empty(), Optional.of("10")); + + // Validate both time and count setting + ds = spark.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY MAX_AGE=2D VERSIONS=20)"); + assert isPlanValid(ds, "db.table", Optional.of("2"), Optional.of("DAY"), Optional.of("20")); + } + + @Test + public void testSetHistoryPolicyIncorrectSyntax() { + // No time granularity + Assertions.assertThrows( + OpenhouseParseException.class, + () -> spark.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY MAX_AGE=24)").show()); + + // Count before time + Assertions.assertThrows( + OpenhouseParseException.class, + () -> + spark + .sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY VERSIONS=10 MAX_AGE=24H)") + .show()); + + // No time or count + Assertions.assertThrows( + OpenhouseParseException.class, + () -> spark.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY )").show()); + } + + @BeforeEach + public void setup() { + spark.sql("CREATE TABLE openhouse.db.table (id bigint, data string) USING iceberg").show(); + spark.sql("CREATE TABLE openhouse.0_.0_ (id bigint, data string) USING iceberg").show(); + spark + .sql("ALTER TABLE openhouse.db.table SET TBLPROPERTIES ('openhouse.tableId' = 'tableid')") + .show(); + spark + .sql("ALTER TABLE openhouse.0_.0_ SET TBLPROPERTIES ('openhouse.tableId' = 'tableid')") + .show(); + } + + @AfterEach + public void tearDown() { + spark.sql("DROP TABLE openhouse.db.table").show(); + spark.sql("DROP TABLE openhouse.0_.0_").show(); + } + + @AfterAll + public void tearDownSpark() { + spark.close(); + } + + @SneakyThrows + private boolean isPlanValid( + Dataset dataframe, + String dbTable, + Optional maxAge, + Optional granularity, + Optional versions) { + String queryStr = dataframe.queryExecution().explainString(ExplainMode.fromString("simple")); + return queryStr.contains(dbTable) + && (!maxAge.isPresent() || queryStr.contains(maxAge.get())) + && (!granularity.isPresent() || queryStr.contains(granularity.get())) + && (!versions.isPresent() || queryStr.contains(versions.get())); + } +} diff --git a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 index 435a77e61..27cb980f2 100644 --- a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 +++ b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 @@ -26,10 +26,11 @@ statement : ALTER TABLE multipartIdentifier SET POLICY '(' retentionPolicy (columnRetentionPolicy)? ')' #setRetentionPolicy | ALTER TABLE multipartIdentifier SET POLICY '(' replicationPolicy ')' #setReplicationPolicy | ALTER TABLE multipartIdentifier SET POLICY '(' sharingPolicy ')' #setSharingPolicy + | ALTER TABLE multipartIdentifier SET POLICY '(' historyPolicy ')' #setHistoryPolicy | ALTER TABLE multipartIdentifier MODIFY columnNameClause SET columnPolicy #setColumnPolicyTag | GRANT privilege ON grantableResource TO principal #grantStatement | REVOKE privilege ON grantableResource FROM principal #revokeStatement - | SHOW GRANTS ON grantableResource #showGrantsStatement + | SHOW GRANTS ON grantableResource #showGrantsStatement ; multipartIdentifier @@ -65,7 +66,7 @@ quotedIdentifier ; nonReserved - : ALTER | TABLE | SET | POLICY | RETENTION | SHARING | REPLICATION + : ALTER | TABLE | SET | POLICY | RETENTION | SHARING | REPLICATION | HISTORY | GRANT | REVOKE | ON | TO | SHOW | GRANTS | PATTERN | WHERE | COLUMN ; @@ -76,6 +77,7 @@ sharingPolicy BOOLEAN : 'TRUE' | 'FALSE' ; + retentionPolicy : RETENTION '=' duration ; @@ -153,12 +155,25 @@ policyTag : PII | HC ; +historyPolicy + : HISTORY maxAge? versions? + ; + +maxAge + : MAX_AGE'='duration + ; + +versions + : VERSIONS'='POSITIVE_INTEGER + ; + ALTER: 'ALTER'; TABLE: 'TABLE'; SET: 'SET'; POLICY: 'POLICY'; RETENTION: 'RETENTION'; REPLICATION: 'REPLICATION'; +HISTORY: 'HISTORY'; SHARING: 'SHARING'; GRANT: 'GRANT'; REVOKE: 'REVOKE'; @@ -182,6 +197,8 @@ HC: 'HC'; MODIFY: 'MODIFY'; TAG: 'TAG'; NONE: 'NONE'; +VERSIONS: 'VERSIONS'; +MAX_AGE: 'MAX_AGE'; POSITIVE_INTEGER : DIGIT+ diff --git a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala index 0619f8342..8120d7210 100644 --- a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala +++ b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala @@ -2,7 +2,7 @@ package com.linkedin.openhouse.spark.sql.catalyst.parser.extensions import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes import com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseSqlExtensionsParser._ -import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, ShowGrantsStatement} +import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetHistoryPolicy, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, ShowGrantsStatement} import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes.GrantableResourceType import com.linkedin.openhouse.gen.tables.client.model.TimePartitionSpec import org.antlr.v4.runtime.tree.ParseTree @@ -22,7 +22,7 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh val (granularity, count) = typedVisit[(String, Int)](ctx.retentionPolicy()) val (colName, colPattern) = if (ctx.columnRetentionPolicy() != null) - typedVisit[(String, String)](ctx.columnRetentionPolicy()) + typedVisit[(String, String)](ctx.columnRetentionPolicy()) else (null, null) SetRetentionPolicy(tableName, granularity, count, Option(colName), Option(colPattern)) } @@ -128,8 +128,8 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh } } - override def visitColumnRetentionPolicyPatternClause(ctx: ColumnRetentionPolicyPatternClauseContext): (String) = { - (ctx.retentionColumnPatternClause().STRING().getText) + override def visitColumnRetentionPolicyPatternClause(ctx: ColumnRetentionPolicyPatternClauseContext): String = { + ctx.retentionColumnPatternClause().STRING().getText } override def visitSharingPolicy(ctx: SharingPolicyContext): String = { @@ -149,7 +149,7 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh } override def visitDuration(ctx: DurationContext): (String, Int) = { - val granularity = if (ctx.RETENTION_DAY != null) { + val granularity: String = if (ctx.RETENTION_DAY != null) { TimePartitionSpec.GranularityEnum.DAY.getValue() } else if (ctx.RETENTION_YEAR() != null) { TimePartitionSpec.GranularityEnum.YEAR.getValue() @@ -158,13 +158,36 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh } else { TimePartitionSpec.GranularityEnum.HOUR.getValue() } - val count = ctx.getText.substring(0, ctx.getText.length - 1).toInt (granularity, count) } + override def visitSetHistoryPolicy(ctx: SetHistoryPolicyContext): SetHistoryPolicy = { + val tableName = typedVisit[Seq[String]](ctx.multipartIdentifier) + val (granularity, maxAge, versions) = typedVisit[(Option[String], Int, Int)](ctx.historyPolicy()) + SetHistoryPolicy(tableName, granularity, maxAge, versions) + } + override def visitHistoryPolicy(ctx: HistoryPolicyContext): (Option[String], Int, Int) = { + val maxAgePolicy = if (ctx.maxAge() != null) + typedVisit[(String, Int)](ctx.maxAge().duration()) + else (null, -1) + val versionPolicy = if (ctx.versions() != null) + typedVisit[Int](ctx.versions()) + else -1 + if (maxAgePolicy._2 == -1 && versionPolicy == -1) { + throw new OpenhouseParseException("At least one of MAX_AGE or VERSIONS must be specified in HISTORY policy, e.g. " + + "ALTER TABLE openhouse.db.table SET POLICY (HISTORY MAX_AGE=2D) or ALTER TABLE openhouse.db.table SET POLICY (HISTORY VERSIONS=3)", + ctx.start.getLine, ctx.start.getCharPositionInLine) + } + (Option(maxAgePolicy._1), maxAgePolicy._2, versionPolicy) + } + + override def visitVersions(ctx: VersionsContext): Integer = { + ctx.POSITIVE_INTEGER().getText.toInt + } + private def toBuffer[T](list: java.util.List[T]) = list.asScala - private def toSeq[T](list: java.util.List[T]): Seq[T] = toBuffer(list).toSeq + private def toSeq[T](list: java.util.List[T]) = toBuffer(list).toSeq private def typedVisit[T](ctx: ParseTree): T = { ctx.accept(this).asInstanceOf[T] diff --git a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetHistoryPolicy.scala b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetHistoryPolicy.scala new file mode 100644 index 000000000..99b42c9b9 --- /dev/null +++ b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetHistoryPolicy.scala @@ -0,0 +1,9 @@ +package com.linkedin.openhouse.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.plans.logical.Command + +case class SetHistoryPolicy (tableName: Seq[String], granularity: Option[String], maxAge: Int, versions: Int) extends Command { + override def simpleString(maxFields: Int): String = { + s"SetHistoryPolicy: ${tableName} ${if (maxAge > 0) "MAX_AGE=" + maxAge else ""}${granularity.getOrElse("")} ${if (versions > 0) "VERSIONS=" + versions else ""}" + } +} diff --git a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala index 8545a2bc1..e93f8a5b3 100644 --- a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala +++ b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala @@ -1,6 +1,6 @@ package com.linkedin.openhouse.spark.sql.execution.datasources.v2 -import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, ShowGrantsStatement} +import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, SetHistoryPolicy, ShowGrantsStatement} import org.apache.iceberg.spark.{Spark3Util, SparkCatalog, SparkSessionCatalog} import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.expressions.PredicateHelper @@ -17,6 +17,8 @@ case class OpenhouseDataSourceV2Strategy(spark: SparkSession) extends Strategy w SetRetentionPolicyExec(catalog, ident, granularity, count, colName, colPattern) :: Nil case SetReplicationPolicy(CatalogAndIdentifierExtractor(catalog, ident), replicationPolicies) => SetReplicationPolicyExec(catalog, ident, replicationPolicies) :: Nil + case SetHistoryPolicy(CatalogAndIdentifierExtractor(catalog, ident), granularity, maxAge, versions) => + SetHistoryPolicyExec(catalog, ident, granularity, maxAge, versions) :: Nil case SetSharingPolicy(CatalogAndIdentifierExtractor(catalog, ident), sharing) => SetSharingPolicyExec(catalog, ident, sharing) :: Nil case SetColumnPolicyTag(CatalogAndIdentifierExtractor(catalog, ident), policyTag, cols) => diff --git a/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetHistoryPolicyExec.scala b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetHistoryPolicyExec.scala new file mode 100644 index 000000000..271f9e192 --- /dev/null +++ b/integrations/spark/spark-3.1/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetHistoryPolicyExec.scala @@ -0,0 +1,45 @@ +package com.linkedin.openhouse.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.execution.datasources.v2.V2CommandExec + +case class SetHistoryPolicyExec( + catalog: TableCatalog, + ident: Identifier, + granularity: Option[String], + maxAge: Int, + versions: Int +) extends V2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.loadTable(ident) match { + case iceberg: SparkTable if iceberg.table().properties().containsKey("openhouse.tableId") => + val key = "updated.openhouse.policy" + val value = { + (maxAge, versions) match { + case maxAgeOnly if versions == -1 => s"""{"history":{"maxAge":${maxAge},"granularity":"${granularity.get}"}}""" + case versionsOnly if maxAge == -1 => s"""{"history":{"versions":${versions}}}""" + case _ => s"""{"history":{"maxAge":${maxAge},"granularity":"${granularity.get}","versions":${versions}}}""" + } + } + + iceberg.table().updateProperties() + .set(key, value) + .commit() + + case table => + throw new UnsupportedOperationException(s"Cannot set history policy for non-Openhouse table: $table") + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"SetHistoryPolicyExec: ${catalog} ${ident} MAX_AGE=${if (maxAge > 0) maxAge else ""}${granularity.getOrElse("")} VERSIONS=${if (versions > 0) versions else ""}" + } +} diff --git a/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetHistoryPolicy.scala b/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetHistoryPolicy.scala new file mode 100644 index 000000000..dc9899b9d --- /dev/null +++ b/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetHistoryPolicy.scala @@ -0,0 +1,9 @@ +package com.linkedin.openhouse.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.plans.logical.LeafCommand + +case class SetHistoryPolicy(tableName: Seq[String], granularity: Option[String], maxAge: Int, versions: Int) extends LeafCommand { + override def simpleString(maxFields: Int): String = { + s"SetHistoryPolicy: ${tableName} ${if (maxAge > 0) "MAX_AGE=" + maxAge else ""}${granularity.getOrElse("")} ${if (versions > 0) "VERSIONS=" + versions else ""}" + } +} diff --git a/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetHistoryPolicyExec.scala b/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetHistoryPolicyExec.scala new file mode 100644 index 000000000..c429d1995 --- /dev/null +++ b/integrations/spark/spark-3.5/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetHistoryPolicyExec.scala @@ -0,0 +1,45 @@ +package com.linkedin.openhouse.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec + +case class SetHistoryPolicyExec( + catalog: TableCatalog, + ident: Identifier, + granularity: Option[String], + maxAge: Int, + versions: Int +) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.loadTable(ident) match { + case iceberg: SparkTable if iceberg.table().properties().containsKey("openhouse.tableId") => + val key = "updated.openhouse.policy" + val value = { + (maxAge, versions) match { + case maxAgeOnly if versions == -1 => s"""{"history":{"maxAge":${maxAge},"granularity":"${granularity.get}"}}""" + case versionsOnly if maxAge == -1 => s"""{"history":{"versions":${versions}}}""" + case _ => s"""{"history":{"maxAge":${maxAge},"granularity":"${granularity.get}","versions":${versions}}}""" + } + } + + iceberg.table().updateProperties() + .set(key, value) + .commit() + + case table => + throw new UnsupportedOperationException(s"Cannot set history policy for non-Openhouse table: $table") + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"SetHistoryPolicyExec: ${catalog} ${ident} MAX_AGE=${if (maxAge > 0) maxAge else ""}${granularity.getOrElse("")} VERSIONS=${if (versions > 0) versions else ""}" + } +}