Skip to content

Commit

Permalink
Adds SQL support for Configurable Table Snapshot History (#262)
Browse files Browse the repository at this point in the history
## Summary

<!--- HINT: Replace #nnn with corresponding Issue number, if you are
fixing an existing issue -->

Adds Spark SQL support for configurable table snapshots, which controls
the versioning of the Openhouse tables.
Syntax is similar to retention but is instead defined as `HISTORY`.

History configuration supports both `MAX_AGE` and `VERSIONS`, where we
retain all table snapshots that live within `MAX_AGE` and within
`VERSIONS`.

Example: A table with `MAX_AGE = 1d` will retain all snapshots that are
within 1 day of when the snapshot retention job last ran.
A table with `VERSIONS = 5` will retain the last 5 snapshots of the
table without considering the age of the snapshots
If both `MAX_AGE = 1d` and `VERSIONS = 5` is defined, keep the last 5
snapshots within the last day. Note: If there are less than 5 snapshots,
then there were less than 5 commits done in the past day.

`MAX_AGE` and `VERSIONS` cannot be defined as less than 1.
The default maximums of `MAX_AGE` and `VERSIONS` defined in
#259 are 3 days and 100
versions respectively.

Examples:
```
ALTER TABLE <db>.<table> SET POLICY (HISTORY MAX_AGE=24H VERSIONS=10)
ALTER TABLE <db>.<table> SET POLICY (HISTORY MAX_AGE=3D)
ALTER TABLE <db>.<table> SET POLICY (HISTORY VERSIONS=5)
```

## Changes

- [x] Client-facing API Changes
- [x] Internal API Changes
- [ ] Bug Fixes
- [x] New Features
- [ ] Performance Improvements
- [ ] Code Style
- [ ] Refactoring
- [ ] Documentation
- [x] Tests

For all the boxes checked, please include additional details of the
changes made in this pull request.

## Testing Done
<!--- Check any relevant boxes with "x" -->

- [x] Manually Tested on local docker setup. Please include commands
ran, and their output.
- [x] Added new tests for the changes made.
- [x] Updated existing tests to reflect the changes made.
- [ ] No tests added or updated. Please explain why. If unsure, please
feel free to ask for help.
- [ ] Some other form of testing like staging or soak time in
production. Please explain.

Tested on local docker running spark:
Tested setting both policies
```
scala> spark.sql("ALTER TABLE openhouse.db.tb2 SET POLICY ( HISTORY MAX_AGE=3d VERSIONS=3 )").show
++
||
++
++


scala> spark.sql("SHOW TBLPROPERTIES openhouse.db.tb2 (policies)").show(truncate=false)
+--------+-----------------------------------------------------------------------------------------------------------------+
|key     |value                                                                                                            |
+--------+-----------------------------------------------------------------------------------------------------------------+
|policies|{
  "sharingEnabled": false,
  "history": {
    "maxAge": 3,
    "granularity": "DAY",
    "versions": 3
  }
}|
+--------+-----------------------------------------------------------------------------------------------------------------+

```

Setting only versions:
```
scala> spark.sql("ALTER TABLE openhouse.db.tb2 SET POLICY ( HISTORY VERSIONS=20 )").show
++
||
++
++


scala> spark.sql("SHOW TBLPROPERTIES openhouse.db.tb2 (policies)").show(truncate=false)
+--------+----------------------------------------------------------------------------------------+
|key     |value                                                                                   |
+--------+----------------------------------------------------------------------------------------+
|policies|{
  "sharingEnabled": false,
  "history": {
    "maxAge": 0,
    "versions": 20
  }
}|
+--------+----------------------------------------------------------------------------------------+
```

Setting only max age
```
scala> spark.sql("ALTER TABLE openhouse.db.tb2 SET POLICY ( HISTORY MAX_AGE=8h )").show
++
||
++
++


scala> spark.sql("SHOW TBLPROPERTIES openhouse.db.tb2 (policies)").show(truncate=false)
+--------+------------------------------------------------------------------------------------------------------------------+
|key     |value                                                                                                             |
+--------+------------------------------------------------------------------------------------------------------------------+
|policies|{
  "sharingEnabled": false,
  "history": {
    "maxAge": 8,
    "granularity": "HOUR",
    "versions": 0
  }
}|
+--------+------------------------------------------------------------------------------------------------------------------+
```

Also tested negative cases (invalid numbers, past maximums defined in
#259)
e.g.

```
scala> spark.sql("ALTER TABLE openhouse.db.tb SET POLICY ( HISTORY MAX_AGE=1h VERSIONS=2 )").show
org.apache.iceberg.exceptions.BadRequestException: 400 , {"status":"BAD_REQUEST","error":"Bad Request","message":" : History for the table [LocalHadoopCluster.db.tb] max age must be between 1 to 3 days","stacktrace":null,"cause":"Not Available"}


```

For all the boxes checked, include a detailed description of the testing
done for the changes made in this pull request.

# Additional Information

- [ ] Breaking Changes
- [ ] Deprecations
- [ ] Large PR broken into smaller PRs, and PR plan linked in the
description.

For all the boxes checked, include additional details of the changes
made in this pull request.

---------

Co-authored-by: Stas Pak <stpak@linkedin.com>
  • Loading branch information
Will-Lo and teamurko authored Dec 19, 2024
1 parent 0ad93ba commit abccdc3
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.linkedin.openhouse.spark.statementtest;

import com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseParseException;
import java.nio.file.Files;
import java.util.Optional;
import lombok.SneakyThrows;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.ExplainMode;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class SetHistoryPolicyStatementTest {
private static SparkSession spark = null;

@SneakyThrows
@BeforeAll
public void setupSpark() {
Path unittest = new Path(Files.createTempDirectory("unittest_settablepolicy").toString());
spark =
SparkSession.builder()
.master("local[2]")
.config(
"spark.sql.extensions",
("org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,"
+ "com.linkedin.openhouse.spark.extensions.OpenhouseSparkSessionExtensions"))
.config("spark.sql.catalog.openhouse", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.openhouse.type", "hadoop")
.config("spark.sql.catalog.openhouse.warehouse", unittest.toString())
.getOrCreate();
}

@Test
public void testSetHistoryPolicyGood() {
// Validate setting only time setting
Dataset<Row> ds = spark.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY MAX_AGE=24H)");
assert isPlanValid(ds, "db.table", Optional.of("24"), Optional.of("HOUR"), Optional.empty());

ds = spark.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY VERSIONS=10)");
assert isPlanValid(ds, "db.table", Optional.empty(), Optional.empty(), Optional.of("10"));

// Validate both time and count setting
ds = spark.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY MAX_AGE=2D VERSIONS=20)");
assert isPlanValid(ds, "db.table", Optional.of("2"), Optional.of("DAY"), Optional.of("20"));
}

@Test
public void testSetHistoryPolicyIncorrectSyntax() {
// No time granularity
Assertions.assertThrows(
OpenhouseParseException.class,
() -> spark.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY MAX_AGE=24)").show());

// Count before time
Assertions.assertThrows(
OpenhouseParseException.class,
() ->
spark
.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY VERSIONS=10 MAX_AGE=24H)")
.show());

// No time or count
Assertions.assertThrows(
OpenhouseParseException.class,
() -> spark.sql("ALTER TABLE openhouse.db.table SET POLICY (HISTORY )").show());
}

@BeforeEach
public void setup() {
spark.sql("CREATE TABLE openhouse.db.table (id bigint, data string) USING iceberg").show();
spark.sql("CREATE TABLE openhouse.0_.0_ (id bigint, data string) USING iceberg").show();
spark
.sql("ALTER TABLE openhouse.db.table SET TBLPROPERTIES ('openhouse.tableId' = 'tableid')")
.show();
spark
.sql("ALTER TABLE openhouse.0_.0_ SET TBLPROPERTIES ('openhouse.tableId' = 'tableid')")
.show();
}

@AfterEach
public void tearDown() {
spark.sql("DROP TABLE openhouse.db.table").show();
spark.sql("DROP TABLE openhouse.0_.0_").show();
}

@AfterAll
public void tearDownSpark() {
spark.close();
}

@SneakyThrows
private boolean isPlanValid(
Dataset<Row> dataframe,
String dbTable,
Optional<String> maxAge,
Optional<String> granularity,
Optional<String> versions) {
String queryStr = dataframe.queryExecution().explainString(ExplainMode.fromString("simple"));
return queryStr.contains(dbTable)
&& (!maxAge.isPresent() || queryStr.contains(maxAge.get()))
&& (!granularity.isPresent() || queryStr.contains(granularity.get()))
&& (!versions.isPresent() || queryStr.contains(versions.get()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ statement
: ALTER TABLE multipartIdentifier SET POLICY '(' retentionPolicy (columnRetentionPolicy)? ')' #setRetentionPolicy
| ALTER TABLE multipartIdentifier SET POLICY '(' replicationPolicy ')' #setReplicationPolicy
| ALTER TABLE multipartIdentifier SET POLICY '(' sharingPolicy ')' #setSharingPolicy
| ALTER TABLE multipartIdentifier SET POLICY '(' historyPolicy ')' #setHistoryPolicy
| ALTER TABLE multipartIdentifier MODIFY columnNameClause SET columnPolicy #setColumnPolicyTag
| GRANT privilege ON grantableResource TO principal #grantStatement
| REVOKE privilege ON grantableResource FROM principal #revokeStatement
| SHOW GRANTS ON grantableResource #showGrantsStatement
| SHOW GRANTS ON grantableResource #showGrantsStatement
;

multipartIdentifier
Expand Down Expand Up @@ -65,7 +66,7 @@ quotedIdentifier
;

nonReserved
: ALTER | TABLE | SET | POLICY | RETENTION | SHARING | REPLICATION
: ALTER | TABLE | SET | POLICY | RETENTION | SHARING | REPLICATION | HISTORY
| GRANT | REVOKE | ON | TO | SHOW | GRANTS | PATTERN | WHERE | COLUMN
;

Expand All @@ -76,6 +77,7 @@ sharingPolicy
BOOLEAN
: 'TRUE' | 'FALSE'
;

retentionPolicy
: RETENTION '=' duration
;
Expand Down Expand Up @@ -153,12 +155,25 @@ policyTag
: PII | HC
;

historyPolicy
: HISTORY maxAge? versions?
;

maxAge
: MAX_AGE'='duration
;

versions
: VERSIONS'='POSITIVE_INTEGER
;

ALTER: 'ALTER';
TABLE: 'TABLE';
SET: 'SET';
POLICY: 'POLICY';
RETENTION: 'RETENTION';
REPLICATION: 'REPLICATION';
HISTORY: 'HISTORY';
SHARING: 'SHARING';
GRANT: 'GRANT';
REVOKE: 'REVOKE';
Expand All @@ -182,6 +197,8 @@ HC: 'HC';
MODIFY: 'MODIFY';
TAG: 'TAG';
NONE: 'NONE';
VERSIONS: 'VERSIONS';
MAX_AGE: 'MAX_AGE';

POSITIVE_INTEGER
: DIGIT+
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.linkedin.openhouse.spark.sql.catalyst.parser.extensions

import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes
import com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseSqlExtensionsParser._
import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, ShowGrantsStatement}
import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetHistoryPolicy, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, ShowGrantsStatement}
import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes.GrantableResourceType
import com.linkedin.openhouse.gen.tables.client.model.TimePartitionSpec
import org.antlr.v4.runtime.tree.ParseTree
Expand All @@ -22,7 +22,7 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh
val (granularity, count) = typedVisit[(String, Int)](ctx.retentionPolicy())
val (colName, colPattern) =
if (ctx.columnRetentionPolicy() != null)
typedVisit[(String, String)](ctx.columnRetentionPolicy())
typedVisit[(String, String)](ctx.columnRetentionPolicy())
else (null, null)
SetRetentionPolicy(tableName, granularity, count, Option(colName), Option(colPattern))
}
Expand Down Expand Up @@ -128,8 +128,8 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh
}
}

override def visitColumnRetentionPolicyPatternClause(ctx: ColumnRetentionPolicyPatternClauseContext): (String) = {
(ctx.retentionColumnPatternClause().STRING().getText)
override def visitColumnRetentionPolicyPatternClause(ctx: ColumnRetentionPolicyPatternClauseContext): String = {
ctx.retentionColumnPatternClause().STRING().getText
}

override def visitSharingPolicy(ctx: SharingPolicyContext): String = {
Expand All @@ -149,7 +149,7 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh
}

override def visitDuration(ctx: DurationContext): (String, Int) = {
val granularity = if (ctx.RETENTION_DAY != null) {
val granularity: String = if (ctx.RETENTION_DAY != null) {
TimePartitionSpec.GranularityEnum.DAY.getValue()
} else if (ctx.RETENTION_YEAR() != null) {
TimePartitionSpec.GranularityEnum.YEAR.getValue()
Expand All @@ -158,13 +158,36 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh
} else {
TimePartitionSpec.GranularityEnum.HOUR.getValue()
}

val count = ctx.getText.substring(0, ctx.getText.length - 1).toInt
(granularity, count)
}

override def visitSetHistoryPolicy(ctx: SetHistoryPolicyContext): SetHistoryPolicy = {
val tableName = typedVisit[Seq[String]](ctx.multipartIdentifier)
val (granularity, maxAge, versions) = typedVisit[(Option[String], Int, Int)](ctx.historyPolicy())
SetHistoryPolicy(tableName, granularity, maxAge, versions)
}
override def visitHistoryPolicy(ctx: HistoryPolicyContext): (Option[String], Int, Int) = {
val maxAgePolicy = if (ctx.maxAge() != null)
typedVisit[(String, Int)](ctx.maxAge().duration())
else (null, -1)
val versionPolicy = if (ctx.versions() != null)
typedVisit[Int](ctx.versions())
else -1
if (maxAgePolicy._2 == -1 && versionPolicy == -1) {
throw new OpenhouseParseException("At least one of MAX_AGE or VERSIONS must be specified in HISTORY policy, e.g. " +
"ALTER TABLE openhouse.db.table SET POLICY (HISTORY MAX_AGE=2D) or ALTER TABLE openhouse.db.table SET POLICY (HISTORY VERSIONS=3)",
ctx.start.getLine, ctx.start.getCharPositionInLine)
}
(Option(maxAgePolicy._1), maxAgePolicy._2, versionPolicy)
}

override def visitVersions(ctx: VersionsContext): Integer = {
ctx.POSITIVE_INTEGER().getText.toInt
}

private def toBuffer[T](list: java.util.List[T]) = list.asScala
private def toSeq[T](list: java.util.List[T]): Seq[T] = toBuffer(list).toSeq
private def toSeq[T](list: java.util.List[T]) = toBuffer(list).toSeq

private def typedVisit[T](ctx: ParseTree): T = {
ctx.accept(this).asInstanceOf[T]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.linkedin.openhouse.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.plans.logical.Command

case class SetHistoryPolicy (tableName: Seq[String], granularity: Option[String], maxAge: Int, versions: Int) extends Command {
override def simpleString(maxFields: Int): String = {
s"SetHistoryPolicy: ${tableName} ${if (maxAge > 0) "MAX_AGE=" + maxAge else ""}${granularity.getOrElse("")} ${if (versions > 0) "VERSIONS=" + versions else ""}"
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.openhouse.spark.sql.execution.datasources.v2

import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, ShowGrantsStatement}
import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetReplicationPolicy, SetRetentionPolicy, SetSharingPolicy, SetHistoryPolicy, ShowGrantsStatement}
import org.apache.iceberg.spark.{Spark3Util, SparkCatalog, SparkSessionCatalog}
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
Expand All @@ -17,6 +17,8 @@ case class OpenhouseDataSourceV2Strategy(spark: SparkSession) extends Strategy w
SetRetentionPolicyExec(catalog, ident, granularity, count, colName, colPattern) :: Nil
case SetReplicationPolicy(CatalogAndIdentifierExtractor(catalog, ident), replicationPolicies) =>
SetReplicationPolicyExec(catalog, ident, replicationPolicies) :: Nil
case SetHistoryPolicy(CatalogAndIdentifierExtractor(catalog, ident), granularity, maxAge, versions) =>
SetHistoryPolicyExec(catalog, ident, granularity, maxAge, versions) :: Nil
case SetSharingPolicy(CatalogAndIdentifierExtractor(catalog, ident), sharing) =>
SetSharingPolicyExec(catalog, ident, sharing) :: Nil
case SetColumnPolicyTag(CatalogAndIdentifierExtractor(catalog, ident), policyTag, cols) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.linkedin.openhouse.spark.sql.execution.datasources.v2

import org.apache.iceberg.spark.source.SparkTable
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.execution.datasources.v2.V2CommandExec

case class SetHistoryPolicyExec(
catalog: TableCatalog,
ident: Identifier,
granularity: Option[String],
maxAge: Int,
versions: Int
) extends V2CommandExec {

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
catalog.loadTable(ident) match {
case iceberg: SparkTable if iceberg.table().properties().containsKey("openhouse.tableId") =>
val key = "updated.openhouse.policy"
val value = {
(maxAge, versions) match {
case maxAgeOnly if versions == -1 => s"""{"history":{"maxAge":${maxAge},"granularity":"${granularity.get}"}}"""
case versionsOnly if maxAge == -1 => s"""{"history":{"versions":${versions}}}"""
case _ => s"""{"history":{"maxAge":${maxAge},"granularity":"${granularity.get}","versions":${versions}}}"""
}
}

iceberg.table().updateProperties()
.set(key, value)
.commit()

case table =>
throw new UnsupportedOperationException(s"Cannot set history policy for non-Openhouse table: $table")
}

Nil
}

override def simpleString(maxFields: Int): String = {
s"SetHistoryPolicyExec: ${catalog} ${ident} MAX_AGE=${if (maxAge > 0) maxAge else ""}${granularity.getOrElse("")} VERSIONS=${if (versions > 0) versions else ""}"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.linkedin.openhouse.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.catalyst.plans.logical.LeafCommand

case class SetHistoryPolicy(tableName: Seq[String], granularity: Option[String], maxAge: Int, versions: Int) extends LeafCommand {
override def simpleString(maxFields: Int): String = {
s"SetHistoryPolicy: ${tableName} ${if (maxAge > 0) "MAX_AGE=" + maxAge else ""}${granularity.getOrElse("")} ${if (versions > 0) "VERSIONS=" + versions else ""}"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.linkedin.openhouse.spark.sql.execution.datasources.v2

import org.apache.iceberg.spark.source.SparkTable
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec

case class SetHistoryPolicyExec(
catalog: TableCatalog,
ident: Identifier,
granularity: Option[String],
maxAge: Int,
versions: Int
) extends LeafV2CommandExec {

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
catalog.loadTable(ident) match {
case iceberg: SparkTable if iceberg.table().properties().containsKey("openhouse.tableId") =>
val key = "updated.openhouse.policy"
val value = {
(maxAge, versions) match {
case maxAgeOnly if versions == -1 => s"""{"history":{"maxAge":${maxAge},"granularity":"${granularity.get}"}}"""
case versionsOnly if maxAge == -1 => s"""{"history":{"versions":${versions}}}"""
case _ => s"""{"history":{"maxAge":${maxAge},"granularity":"${granularity.get}","versions":${versions}}}"""
}
}

iceberg.table().updateProperties()
.set(key, value)
.commit()

case table =>
throw new UnsupportedOperationException(s"Cannot set history policy for non-Openhouse table: $table")
}

Nil
}

override def simpleString(maxFields: Int): String = {
s"SetHistoryPolicyExec: ${catalog} ${ident} MAX_AGE=${if (maxAge > 0) maxAge else ""}${granularity.getOrElse("")} VERSIONS=${if (versions > 0) versions else ""}"
}
}

0 comments on commit abccdc3

Please sign in to comment.