Skip to content

Commit

Permalink
Spark 3.2, 3.3, 3.4: Support specifying spec_id in RewriteManifestPro…
Browse files Browse the repository at this point in the history
  • Loading branch information
puchengy authored and devangjhabakh committed Apr 22, 2024
1 parent 5954c64 commit c9631f6
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,39 @@ public void testReplacePartitionField() {
row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
}

@Test
public void testWriteManifestWithSpecId() {
sql(
"CREATE TABLE %s (id int, dt string, hr string) USING iceberg PARTITIONED BY (dt)",
tableName);
sql("ALTER TABLE %s SET TBLPROPERTIES ('commit.manifest-merge.enabled' = 'false')", tableName);

sql("INSERT INTO %s VALUES (1, '2024-01-01', '00')", tableName);
sql("INSERT INTO %s VALUES (2, '2024-01-01', '00')", tableName);
assertEquals(
"Should have 2 manifests and their partition spec id should be 0",
ImmutableList.of(row(0), row(0)),
sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", tableName));

sql("ALTER TABLE %s ADD PARTITION FIELD hr", tableName);
sql("INSERT INTO %s VALUES (3, '2024-01-01', '00')", tableName);
assertEquals(
"Should have 3 manifests and their partition spec id should be 0 and 1",
ImmutableList.of(row(0), row(0), row(1)),
sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", tableName));

List<Object[]> output = sql("CALL %s.system.rewrite_manifests('%s')", catalogName, tableIdent);
assertEquals("Nothing should be rewritten", ImmutableList.of(row(0, 0)), output);

output =
sql(
"CALL %s.system.rewrite_manifests(table => '%s', spec_id => 0)",
catalogName, tableIdent);
assertEquals("There should be 2 manifests rewriten", ImmutableList.of(row(2, 1)), output);
assertEquals(
"Should have 2 manifests and their partition spec id should be 0 and 1",
ImmutableList.of(row(0), row(1)),
sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", tableName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class RewriteManifestsProcedure extends BaseProcedure {
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", DataTypes.StringType),
ProcedureParameter.optional("use_caching", DataTypes.BooleanType)
ProcedureParameter.optional("use_caching", DataTypes.BooleanType),
ProcedureParameter.optional("spec_id", DataTypes.IntegerType)
};

// counts are not nullable since the action result is never null
Expand Down Expand Up @@ -85,6 +86,7 @@ public StructType outputType() {
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
Boolean useCaching = args.isNullAt(1) ? null : args.getBoolean(1);
Integer specId = args.isNullAt(2) ? null : args.getInt(2);

return modifyIcebergTable(
tableIdent,
Expand All @@ -95,6 +97,10 @@ public InternalRow[] call(InternalRow args) {
action.option(RewriteManifestsSparkAction.USE_CACHING, useCaching.toString());
}

if (specId != null) {
action.specId(specId);
}

RewriteManifests.Result result = action.execute();

return toOutputRows(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,39 @@ public void testReplacePartitionField() {
row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
}

@Test
public void testWriteManifestWithSpecId() {
sql(
"CREATE TABLE %s (id int, dt string, hr string) USING iceberg PARTITIONED BY (dt)",
tableName);
sql("ALTER TABLE %s SET TBLPROPERTIES ('commit.manifest-merge.enabled' = 'false')", tableName);

sql("INSERT INTO %s VALUES (1, '2024-01-01', '00')", tableName);
sql("INSERT INTO %s VALUES (2, '2024-01-01', '00')", tableName);
assertEquals(
"Should have 2 manifests and their partition spec id should be 0",
ImmutableList.of(row(0), row(0)),
sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", tableName));

sql("ALTER TABLE %s ADD PARTITION FIELD hr", tableName);
sql("INSERT INTO %s VALUES (3, '2024-01-01', '00')", tableName);
assertEquals(
"Should have 3 manifests and their partition spec id should be 0 and 1",
ImmutableList.of(row(0), row(0), row(1)),
sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", tableName));

List<Object[]> output = sql("CALL %s.system.rewrite_manifests('%s')", catalogName, tableIdent);
assertEquals("Nothing should be rewritten", ImmutableList.of(row(0, 0)), output);

output =
sql(
"CALL %s.system.rewrite_manifests(table => '%s', spec_id => 0)",
catalogName, tableIdent);
assertEquals("There should be 2 manifests rewriten", ImmutableList.of(row(2, 1)), output);
assertEquals(
"Should have 2 manifests and their partition spec id should be 0 and 1",
ImmutableList.of(row(0), row(1)),
sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", tableName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class RewriteManifestsProcedure extends BaseProcedure {
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", DataTypes.StringType),
ProcedureParameter.optional("use_caching", DataTypes.BooleanType)
ProcedureParameter.optional("use_caching", DataTypes.BooleanType),
ProcedureParameter.optional("spec_id", DataTypes.IntegerType)
};

// counts are not nullable since the action result is never null
Expand Down Expand Up @@ -85,6 +86,7 @@ public StructType outputType() {
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
Boolean useCaching = args.isNullAt(1) ? null : args.getBoolean(1);
Integer specId = args.isNullAt(2) ? null : args.getInt(2);

return modifyIcebergTable(
tableIdent,
Expand All @@ -95,6 +97,10 @@ public InternalRow[] call(InternalRow args) {
action.option(RewriteManifestsSparkAction.USE_CACHING, useCaching.toString());
}

if (specId != null) {
action.specId(specId);
}

RewriteManifests.Result result = action.execute();

return toOutputRows(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,4 +331,39 @@ public void testReplacePartitionField() {
row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
}

@Test
public void testWriteManifestWithSpecId() {
sql(
"CREATE TABLE %s (id int, dt string, hr string) USING iceberg PARTITIONED BY (dt)",
tableName);
sql("ALTER TABLE %s SET TBLPROPERTIES ('commit.manifest-merge.enabled' = 'false')", tableName);

sql("INSERT INTO %s VALUES (1, '2024-01-01', '00')", tableName);
sql("INSERT INTO %s VALUES (2, '2024-01-01', '00')", tableName);
assertEquals(
"Should have 2 manifests and their partition spec id should be 0",
ImmutableList.of(row(0), row(0)),
sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", tableName));

sql("ALTER TABLE %s ADD PARTITION FIELD hr", tableName);
sql("INSERT INTO %s VALUES (3, '2024-01-01', '00')", tableName);
assertEquals(
"Should have 3 manifests and their partition spec id should be 0 and 1",
ImmutableList.of(row(0), row(0), row(1)),
sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", tableName));

List<Object[]> output = sql("CALL %s.system.rewrite_manifests('%s')", catalogName, tableIdent);
assertEquals("Nothing should be rewritten", ImmutableList.of(row(0, 0)), output);

output =
sql(
"CALL %s.system.rewrite_manifests(table => '%s', spec_id => 0)",
catalogName, tableIdent);
assertEquals("There should be 2 manifests rewriten", ImmutableList.of(row(2, 1)), output);
assertEquals(
"Should have 2 manifests and their partition spec id should be 0 and 1",
ImmutableList.of(row(0), row(1)),
sql("SELECT partition_spec_id FROM %s.manifests order by 1 asc", tableName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class RewriteManifestsProcedure extends BaseProcedure {
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", DataTypes.StringType),
ProcedureParameter.optional("use_caching", DataTypes.BooleanType)
ProcedureParameter.optional("use_caching", DataTypes.BooleanType),
ProcedureParameter.optional("spec_id", DataTypes.IntegerType)
};

// counts are not nullable since the action result is never null
Expand Down Expand Up @@ -85,6 +86,7 @@ public StructType outputType() {
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
Boolean useCaching = args.isNullAt(1) ? null : args.getBoolean(1);
Integer specId = args.isNullAt(2) ? null : args.getInt(2);

return modifyIcebergTable(
tableIdent,
Expand All @@ -95,6 +97,10 @@ public InternalRow[] call(InternalRow args) {
action.option(RewriteManifestsSparkAction.USE_CACHING, useCaching.toString());
}

if (specId != null) {
action.specId(specId);
}

RewriteManifests.Result result = action.execute();

return toOutputRows(result);
Expand Down

0 comments on commit c9631f6

Please sign in to comment.